mirror of
				https://github.com/esphome/esphome.git
				synced 2025-11-04 09:01:49 +00:00 
			
		
		
		
	Convert time to use tzdata (#2425)
This commit is contained in:
		@@ -1,10 +1,8 @@
 | 
			
		||||
import bisect
 | 
			
		||||
import datetime
 | 
			
		||||
import logging
 | 
			
		||||
import math
 | 
			
		||||
import string
 | 
			
		||||
from importlib import resources
 | 
			
		||||
from typing import Optional
 | 
			
		||||
from datetime import timezone
 | 
			
		||||
 | 
			
		||||
import pytz
 | 
			
		||||
import tzlocal
 | 
			
		||||
 | 
			
		||||
import esphome.codegen as cg
 | 
			
		||||
@@ -44,111 +42,47 @@ ESPTime = time_ns.struct("ESPTime")
 | 
			
		||||
TimeHasTimeCondition = time_ns.class_("TimeHasTimeCondition", Condition)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _tz_timedelta(td):
 | 
			
		||||
    offset_hour = int(td.total_seconds() / (60 * 60))
 | 
			
		||||
    offset_minute = int(abs(td.total_seconds() / 60)) % 60
 | 
			
		||||
    offset_second = int(abs(td.total_seconds())) % 60
 | 
			
		||||
    if offset_hour == 0 and offset_minute == 0 and offset_second == 0:
 | 
			
		||||
        return "0"
 | 
			
		||||
    if offset_minute == 0 and offset_second == 0:
 | 
			
		||||
        return f"{offset_hour}"
 | 
			
		||||
    if offset_second == 0:
 | 
			
		||||
        return f"{offset_hour}:{offset_minute}"
 | 
			
		||||
    return f"{offset_hour}:{offset_minute}:{offset_second}"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# https://stackoverflow.com/a/16804556/8924614
 | 
			
		||||
def _week_of_month(dt):
 | 
			
		||||
    first_day = dt.replace(day=1)
 | 
			
		||||
    dom = dt.day
 | 
			
		||||
    adjusted_dom = dom + first_day.weekday()
 | 
			
		||||
    return int(math.ceil(adjusted_dom / 7.0))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _tz_dst_str(dt):
 | 
			
		||||
    td = datetime.timedelta(hours=dt.hour, minutes=dt.minute, seconds=dt.second)
 | 
			
		||||
    return f"M{dt.month}.{_week_of_month(dt)}.{dt.isoweekday() % 7}/{_tz_timedelta(td)}"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _safe_tzname(tz, dt):
 | 
			
		||||
    tzname = tz.tzname(dt)
 | 
			
		||||
    # pytz does not always return valid tznames
 | 
			
		||||
    # For example: 'Europe/Saratov' returns '+04'
 | 
			
		||||
    # Work around it by using a generic name for the timezone
 | 
			
		||||
    if not all(c in string.ascii_letters for c in tzname):
 | 
			
		||||
        return "TZ"
 | 
			
		||||
    return tzname
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _non_dst_tz(tz, dt):
 | 
			
		||||
    tzname = _safe_tzname(tz, dt)
 | 
			
		||||
    utcoffset = tz.utcoffset(dt)
 | 
			
		||||
    _LOGGER.info(
 | 
			
		||||
        "Detected timezone '%s' with UTC offset %s", tzname, _tz_timedelta(utcoffset)
 | 
			
		||||
    )
 | 
			
		||||
    tzbase = f"{tzname}{_tz_timedelta(-1 * utcoffset)}"
 | 
			
		||||
    return tzbase
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def convert_tz(pytz_obj):
 | 
			
		||||
    tz = pytz_obj
 | 
			
		||||
 | 
			
		||||
    now = datetime.datetime.now()
 | 
			
		||||
    first_january = datetime.datetime(year=now.year, month=1, day=1)
 | 
			
		||||
 | 
			
		||||
    if not isinstance(tz, pytz.tzinfo.DstTzInfo):
 | 
			
		||||
        return _non_dst_tz(tz, first_january)
 | 
			
		||||
 | 
			
		||||
    # pylint: disable=protected-access
 | 
			
		||||
    transition_times = tz._utc_transition_times
 | 
			
		||||
    transition_info = tz._transition_info
 | 
			
		||||
    idx = max(0, bisect.bisect_right(transition_times, now))
 | 
			
		||||
    if idx >= len(transition_times):
 | 
			
		||||
        return _non_dst_tz(tz, now)
 | 
			
		||||
 | 
			
		||||
    idx1, idx2 = idx, idx + 1
 | 
			
		||||
    dstoffset1 = transition_info[idx1][1]
 | 
			
		||||
    if dstoffset1 == datetime.timedelta(seconds=0):
 | 
			
		||||
        # Normalize to 1 being DST on
 | 
			
		||||
        idx1, idx2 = idx + 1, idx + 2
 | 
			
		||||
 | 
			
		||||
    if idx2 >= len(transition_times):
 | 
			
		||||
        return _non_dst_tz(tz, now)
 | 
			
		||||
 | 
			
		||||
    if transition_times[idx2].year > now.year + 1:
 | 
			
		||||
        # Next transition is scheduled after this year
 | 
			
		||||
        # Probably a scheduler timezone change.
 | 
			
		||||
        return _non_dst_tz(tz, now)
 | 
			
		||||
 | 
			
		||||
    utcoffset_on, _, tzname_on = transition_info[idx1]
 | 
			
		||||
    utcoffset_off, _, tzname_off = transition_info[idx2]
 | 
			
		||||
    dst_begins_utc = transition_times[idx1]
 | 
			
		||||
    dst_begins_local = dst_begins_utc + utcoffset_off
 | 
			
		||||
    dst_ends_utc = transition_times[idx2]
 | 
			
		||||
    dst_ends_local = dst_ends_utc + utcoffset_on
 | 
			
		||||
 | 
			
		||||
    tzbase = f"{tzname_off}{_tz_timedelta(-1 * utcoffset_off)}"
 | 
			
		||||
 | 
			
		||||
    tzext = f"{tzname_on}{_tz_timedelta(-1 * utcoffset_on)},{_tz_dst_str(dst_begins_local)},{_tz_dst_str(dst_ends_local)}"
 | 
			
		||||
    _LOGGER.info(
 | 
			
		||||
        "Detected timezone '%s' with UTC offset %s and daylight saving time from "
 | 
			
		||||
        "%s to %s",
 | 
			
		||||
        tzname_off,
 | 
			
		||||
        _tz_timedelta(utcoffset_off),
 | 
			
		||||
        dst_begins_local.strftime("%d %B %X"),
 | 
			
		||||
        dst_ends_local.strftime("%d %B %X"),
 | 
			
		||||
    )
 | 
			
		||||
    return tzbase + tzext
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def detect_tz():
 | 
			
		||||
def _load_tzdata(iana_key: str) -> Optional[bytes]:
 | 
			
		||||
    # From https://tzdata.readthedocs.io/en/latest/#examples
 | 
			
		||||
    try:
 | 
			
		||||
        tz = tzlocal.get_localzone()
 | 
			
		||||
    except pytz.exceptions.UnknownTimeZoneError:
 | 
			
		||||
        _LOGGER.warning("Could not auto-detect timezone. Using UTC...")
 | 
			
		||||
        return "UTC"
 | 
			
		||||
        package_loc, resource = iana_key.rsplit("/", 1)
 | 
			
		||||
    except ValueError:
 | 
			
		||||
        return None
 | 
			
		||||
    package = "tzdata.zoneinfo." + package_loc.replace("/", ".")
 | 
			
		||||
 | 
			
		||||
    return convert_tz(tz)
 | 
			
		||||
    try:
 | 
			
		||||
        return resources.read_binary(package, resource)
 | 
			
		||||
    except (FileNotFoundError, ModuleNotFoundError):
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _extract_tz_string(tzfile: bytes) -> str:
 | 
			
		||||
    try:
 | 
			
		||||
        return tzfile.split(b"\n")[-2].decode()
 | 
			
		||||
    except (IndexError, UnicodeDecodeError):
 | 
			
		||||
        _LOGGER.error("Could not determine TZ string. Please report this issue.")
 | 
			
		||||
        _LOGGER.error("tzfile contents: %s", tzfile, exc_info=True)
 | 
			
		||||
        raise
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def detect_tz() -> str:
 | 
			
		||||
    localzone = tzlocal.get_localzone()
 | 
			
		||||
    if localzone is timezone.utc:
 | 
			
		||||
        return "UTC0"
 | 
			
		||||
    if not hasattr(localzone, "key"):
 | 
			
		||||
        raise cv.Invalid(
 | 
			
		||||
            "Could not automatically determine timezone, please set timezone manually."
 | 
			
		||||
        )
 | 
			
		||||
    iana_key = localzone.key
 | 
			
		||||
    _LOGGER.info("Detected timezone '%s'", iana_key)
 | 
			
		||||
    tzfile = _load_tzdata(iana_key)
 | 
			
		||||
    if tzfile is None:
 | 
			
		||||
        raise cv.Invalid(
 | 
			
		||||
            "Could not automatically determine timezone, please set timezone manually."
 | 
			
		||||
        )
 | 
			
		||||
    ret = _extract_tz_string(tzfile)
 | 
			
		||||
    _LOGGER.debug(" -> TZ string %s", ret)
 | 
			
		||||
    return ret
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _parse_cron_int(value, special_mapping, message):
 | 
			
		||||
@@ -327,15 +261,15 @@ def validate_cron_keys(value):
 | 
			
		||||
    return cv.has_at_least_one_key(*CRON_KEYS)(value)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def validate_tz(value):
 | 
			
		||||
def validate_tz(value: str) -> str:
 | 
			
		||||
    value = cv.string_strict(value)
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        pytz_obj = pytz.timezone(value)
 | 
			
		||||
    except pytz.UnknownTimeZoneError:  # pylint: disable=broad-except
 | 
			
		||||
    tzfile = _load_tzdata(value)
 | 
			
		||||
    if tzfile is None:
 | 
			
		||||
        # Not a IANA key, probably a TZ string
 | 
			
		||||
        return value
 | 
			
		||||
 | 
			
		||||
    return convert_tz(pytz_obj)
 | 
			
		||||
    return _extract_tz_string(tzfile)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
TIME_SCHEMA = cv.Schema(
 | 
			
		||||
 
 | 
			
		||||
@@ -3,8 +3,8 @@ PyYAML==5.4.1
 | 
			
		||||
paho-mqtt==1.5.1
 | 
			
		||||
colorama==0.4.4
 | 
			
		||||
tornado==6.1
 | 
			
		||||
tzlocal==3.0
 | 
			
		||||
pytz==2021.1
 | 
			
		||||
tzlocal==3.0    # from time
 | 
			
		||||
tzdata>=2021.1  # from time
 | 
			
		||||
pyserial==3.5
 | 
			
		||||
platformio==5.2.0
 | 
			
		||||
esptool==3.1
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user