mirror of
https://github.com/esphome/esphome.git
synced 2025-03-14 06:38:17 +00:00
577 lines
16 KiB
Python
577 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import io
|
|
import logging
|
|
from pathlib import Path
|
|
import re
|
|
|
|
from PIL import Image, UnidentifiedImageError
|
|
|
|
from esphome import core, external_files
|
|
import esphome.codegen as cg
|
|
import esphome.config_validation as cv
|
|
from esphome.const import (
|
|
CONF_DITHER,
|
|
CONF_FILE,
|
|
CONF_ICON,
|
|
CONF_ID,
|
|
CONF_PATH,
|
|
CONF_RAW_DATA_ID,
|
|
CONF_RESIZE,
|
|
CONF_SOURCE,
|
|
CONF_TYPE,
|
|
CONF_URL,
|
|
)
|
|
from esphome.core import CORE, HexInt
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
DOMAIN = "image"
|
|
DEPENDENCIES = ["display"]
|
|
|
|
image_ns = cg.esphome_ns.namespace("image")
|
|
|
|
ImageType = image_ns.enum("ImageType")
|
|
|
|
CONF_OPAQUE = "opaque"
|
|
CONF_CHROMA_KEY = "chroma_key"
|
|
CONF_ALPHA_CHANNEL = "alpha_channel"
|
|
CONF_INVERT_ALPHA = "invert_alpha"
|
|
|
|
TRANSPARENCY_TYPES = (
|
|
CONF_OPAQUE,
|
|
CONF_CHROMA_KEY,
|
|
CONF_ALPHA_CHANNEL,
|
|
)
|
|
|
|
|
|
def get_image_type_enum(type):
|
|
return getattr(ImageType, f"IMAGE_TYPE_{type.upper()}")
|
|
|
|
|
|
def get_transparency_enum(transparency):
|
|
return getattr(TransparencyType, f"TRANSPARENCY_{transparency.upper()}")
|
|
|
|
|
|
class ImageEncoder:
|
|
"""
|
|
Superclass of image type encoders
|
|
"""
|
|
|
|
# Control which transparency options are available for a given type
|
|
allow_config = {CONF_ALPHA_CHANNEL, CONF_CHROMA_KEY, CONF_OPAQUE}
|
|
|
|
# All imageencoder types are valid
|
|
@staticmethod
|
|
def validate(value):
|
|
return value
|
|
|
|
def __init__(self, width, height, transparency, dither, invert_alpha):
|
|
"""
|
|
:param width: The image width in pixels
|
|
:param height: The image height in pixels
|
|
:param transparency: Transparency type
|
|
:param dither: Dither method
|
|
:param invert_alpha: True if the alpha channel should be inverted; for monochrome formats inverts the colours.
|
|
"""
|
|
self.transparency = transparency
|
|
self.width = width
|
|
self.height = height
|
|
self.data = [0 for _ in range(width * height)]
|
|
self.dither = dither
|
|
self.index = 0
|
|
self.invert_alpha = invert_alpha
|
|
|
|
def convert(self, image):
|
|
"""
|
|
Convert the image format
|
|
:param image: Input image
|
|
:return: converted image
|
|
"""
|
|
return image
|
|
|
|
def encode(self, pixel):
|
|
"""
|
|
Encode a single pixel
|
|
"""
|
|
|
|
def end_row(self):
|
|
"""
|
|
Marks the end of a pixel row
|
|
:return:
|
|
"""
|
|
|
|
|
|
class ImageBinary(ImageEncoder):
|
|
allow_config = {CONF_OPAQUE, CONF_INVERT_ALPHA, CONF_CHROMA_KEY}
|
|
|
|
def __init__(self, width, height, transparency, dither, invert_alpha):
|
|
self.width8 = (width + 7) // 8
|
|
super().__init__(self.width8, height, transparency, dither, invert_alpha)
|
|
self.bitno = 0
|
|
|
|
def convert(self, image):
|
|
return image.convert("1", dither=self.dither)
|
|
|
|
def encode(self, pixel):
|
|
if self.invert_alpha:
|
|
pixel = not pixel
|
|
if pixel:
|
|
self.data[self.index] |= 0x80 >> (self.bitno % 8)
|
|
self.bitno += 1
|
|
if self.bitno == 8:
|
|
self.bitno = 0
|
|
self.index += 1
|
|
|
|
def end_row(self):
|
|
"""
|
|
Pad rows to a byte boundary
|
|
"""
|
|
if self.bitno != 0:
|
|
self.bitno = 0
|
|
self.index += 1
|
|
|
|
|
|
class ImageGrayscale(ImageEncoder):
|
|
allow_config = {CONF_ALPHA_CHANNEL, CONF_CHROMA_KEY, CONF_INVERT_ALPHA, CONF_OPAQUE}
|
|
|
|
def convert(self, image):
|
|
return image.convert("LA")
|
|
|
|
def encode(self, pixel):
|
|
b, a = pixel
|
|
if self.transparency == CONF_CHROMA_KEY:
|
|
if b == 1:
|
|
b = 0
|
|
if a != 0xFF:
|
|
b = 1
|
|
if self.invert_alpha:
|
|
b ^= 0xFF
|
|
if self.transparency == CONF_ALPHA_CHANNEL:
|
|
if a != 0xFF:
|
|
b = a
|
|
self.data[self.index] = b
|
|
self.index += 1
|
|
|
|
|
|
class ImageRGB565(ImageEncoder):
|
|
def __init__(self, width, height, transparency, dither, invert_alpha):
|
|
stride = 3 if transparency == CONF_ALPHA_CHANNEL else 2
|
|
super().__init__(
|
|
width * stride,
|
|
height,
|
|
transparency,
|
|
dither,
|
|
invert_alpha,
|
|
)
|
|
|
|
def convert(self, image):
|
|
return image.convert("RGBA")
|
|
|
|
def encode(self, pixel):
|
|
r, g, b, a = pixel
|
|
r = r >> 3
|
|
g = g >> 2
|
|
b = b >> 3
|
|
if self.transparency == CONF_CHROMA_KEY:
|
|
if r == 0 and g == 1 and b == 0:
|
|
g = 0
|
|
elif a < 128:
|
|
r = 0
|
|
g = 1
|
|
b = 0
|
|
rgb = (r << 11) | (g << 5) | b
|
|
self.data[self.index] = rgb >> 8
|
|
self.index += 1
|
|
self.data[self.index] = rgb & 0xFF
|
|
self.index += 1
|
|
if self.transparency == CONF_ALPHA_CHANNEL:
|
|
if self.invert_alpha:
|
|
a ^= 0xFF
|
|
self.data[self.index] = a
|
|
self.index += 1
|
|
|
|
|
|
class ImageRGB(ImageEncoder):
|
|
def __init__(self, width, height, transparency, dither, invert_alpha):
|
|
stride = 4 if transparency == CONF_ALPHA_CHANNEL else 3
|
|
super().__init__(
|
|
width * stride,
|
|
height,
|
|
transparency,
|
|
dither,
|
|
invert_alpha,
|
|
)
|
|
|
|
def convert(self, image):
|
|
return image.convert("RGBA")
|
|
|
|
def encode(self, pixel):
|
|
r, g, b, a = pixel
|
|
if self.transparency == CONF_CHROMA_KEY:
|
|
if r == 0 and g == 1 and b == 0:
|
|
g = 0
|
|
elif a < 128:
|
|
r = 0
|
|
g = 1
|
|
b = 0
|
|
self.data[self.index] = r
|
|
self.index += 1
|
|
self.data[self.index] = g
|
|
self.index += 1
|
|
self.data[self.index] = b
|
|
self.index += 1
|
|
if self.transparency == CONF_ALPHA_CHANNEL:
|
|
if self.invert_alpha:
|
|
a ^= 0xFF
|
|
self.data[self.index] = a
|
|
self.index += 1
|
|
|
|
|
|
class ReplaceWith:
|
|
"""
|
|
Placeholder class to provide feedback on deprecated features
|
|
"""
|
|
|
|
allow_config = {CONF_ALPHA_CHANNEL, CONF_CHROMA_KEY, CONF_OPAQUE}
|
|
|
|
def __init__(self, replace_with):
|
|
self.replace_with = replace_with
|
|
|
|
def validate(self, value):
|
|
raise cv.Invalid(
|
|
f"Image type {value} is removed; replace with {self.replace_with}"
|
|
)
|
|
|
|
|
|
IMAGE_TYPE = {
|
|
"BINARY": ImageBinary,
|
|
"GRAYSCALE": ImageGrayscale,
|
|
"RGB565": ImageRGB565,
|
|
"RGB": ImageRGB,
|
|
"TRANSPARENT_BINARY": ReplaceWith(
|
|
"'type: BINARY' and 'use_transparency: chroma_key'"
|
|
),
|
|
"RGB24": ReplaceWith("'type: RGB'"),
|
|
"RGBA": ReplaceWith("'type: RGB' and 'use_transparency: alpha_channel'"),
|
|
}
|
|
|
|
TransparencyType = image_ns.enum("TransparencyType")
|
|
|
|
CONF_USE_TRANSPARENCY = "use_transparency"
|
|
|
|
# If the MDI file cannot be downloaded within this time, abort.
|
|
IMAGE_DOWNLOAD_TIMEOUT = 30 # seconds
|
|
|
|
SOURCE_LOCAL = "local"
|
|
SOURCE_MDI = "mdi"
|
|
SOURCE_WEB = "web"
|
|
|
|
Image_ = image_ns.class_("Image")
|
|
|
|
|
|
def compute_local_image_path(value) -> Path:
|
|
url = value[CONF_URL] if isinstance(value, dict) else value
|
|
h = hashlib.new("sha256")
|
|
h.update(url.encode())
|
|
key = h.hexdigest()[:8]
|
|
base_dir = external_files.compute_local_file_dir(DOMAIN)
|
|
return base_dir / key
|
|
|
|
|
|
def local_path(value):
|
|
value = value[CONF_PATH] if isinstance(value, dict) else value
|
|
return str(CORE.relative_config_path(value))
|
|
|
|
|
|
def download_file(url, path):
|
|
external_files.download_content(url, path, IMAGE_DOWNLOAD_TIMEOUT)
|
|
return str(path)
|
|
|
|
|
|
def download_mdi(value):
|
|
mdi_id = value[CONF_ICON] if isinstance(value, dict) else value
|
|
base_dir = external_files.compute_local_file_dir(DOMAIN) / "mdi"
|
|
path = base_dir / f"{mdi_id}.svg"
|
|
|
|
url = f"https://raw.githubusercontent.com/Templarian/MaterialDesign/master/svg/{mdi_id}.svg"
|
|
return download_file(url, path)
|
|
|
|
|
|
def download_image(value):
|
|
value = value[CONF_URL] if isinstance(value, dict) else value
|
|
return download_file(value, compute_local_image_path(value))
|
|
|
|
|
|
def is_svg_file(file):
|
|
if not file:
|
|
return False
|
|
with open(file, "rb") as f:
|
|
return "<svg " in str(f.read(1024))
|
|
|
|
|
|
def validate_cairosvg_installed():
|
|
try:
|
|
import cairosvg
|
|
except ImportError as err:
|
|
raise cv.Invalid(
|
|
"Please install the cairosvg python package to use this feature. "
|
|
"(pip install cairosvg)"
|
|
) from err
|
|
|
|
major, minor, _ = cairosvg.__version__.split(".")
|
|
if major < "2" or major == "2" and minor < "2":
|
|
raise cv.Invalid(
|
|
"Please update your cairosvg installation to at least 2.2.0. "
|
|
"(pip install -U cairosvg)"
|
|
)
|
|
|
|
|
|
def validate_file_shorthand(value):
|
|
value = cv.string_strict(value)
|
|
if value.startswith("mdi:"):
|
|
match = re.search(r"mdi:([a-zA-Z0-9\-]+)", value)
|
|
if match is None:
|
|
raise cv.Invalid("Could not parse mdi icon name.")
|
|
icon = match.group(1)
|
|
return download_mdi(icon)
|
|
|
|
if value.startswith("http://") or value.startswith("https://"):
|
|
return download_image(value)
|
|
|
|
value = cv.file_(value)
|
|
return local_path(value)
|
|
|
|
|
|
LOCAL_SCHEMA = cv.All(
|
|
{
|
|
cv.Required(CONF_PATH): cv.file_,
|
|
},
|
|
local_path,
|
|
)
|
|
|
|
MDI_SCHEMA = cv.All(
|
|
{
|
|
cv.Required(CONF_ICON): cv.string,
|
|
},
|
|
download_mdi,
|
|
)
|
|
|
|
WEB_SCHEMA = cv.All(
|
|
{
|
|
cv.Required(CONF_URL): cv.string,
|
|
},
|
|
download_image,
|
|
)
|
|
|
|
TYPED_FILE_SCHEMA = cv.typed_schema(
|
|
{
|
|
SOURCE_LOCAL: LOCAL_SCHEMA,
|
|
SOURCE_MDI: MDI_SCHEMA,
|
|
SOURCE_WEB: WEB_SCHEMA,
|
|
},
|
|
key=CONF_SOURCE,
|
|
)
|
|
|
|
|
|
def validate_transparency(choices=TRANSPARENCY_TYPES):
|
|
def validate(value):
|
|
if isinstance(value, bool):
|
|
value = str(value)
|
|
return cv.one_of(*choices, lower=True)(value)
|
|
|
|
return validate
|
|
|
|
|
|
def validate_type(image_types):
|
|
def validate(value):
|
|
value = cv.one_of(*image_types, upper=True)(value)
|
|
return IMAGE_TYPE[value].validate(value)
|
|
|
|
return validate
|
|
|
|
|
|
def validate_settings(value):
|
|
type = value[CONF_TYPE]
|
|
transparency = value[CONF_USE_TRANSPARENCY].lower()
|
|
allow_config = IMAGE_TYPE[type].allow_config
|
|
if transparency not in allow_config:
|
|
raise cv.Invalid(
|
|
f"Image format '{type}' cannot have transparency: {transparency}"
|
|
)
|
|
invert_alpha = value.get(CONF_INVERT_ALPHA, False)
|
|
if (
|
|
invert_alpha
|
|
and transparency != CONF_ALPHA_CHANNEL
|
|
and CONF_INVERT_ALPHA not in allow_config
|
|
):
|
|
raise cv.Invalid("No alpha channel to invert")
|
|
if file := value.get(CONF_FILE):
|
|
file = Path(file)
|
|
if is_svg_file(file):
|
|
validate_cairosvg_installed()
|
|
else:
|
|
try:
|
|
Image.open(file)
|
|
except UnidentifiedImageError as exc:
|
|
raise cv.Invalid(f"File can't be opened as image: {file}") from exc
|
|
return value
|
|
|
|
|
|
BASE_SCHEMA = cv.Schema(
|
|
{
|
|
cv.Required(CONF_ID): cv.declare_id(Image_),
|
|
cv.Required(CONF_FILE): cv.Any(validate_file_shorthand, TYPED_FILE_SCHEMA),
|
|
cv.Optional(CONF_RESIZE): cv.dimensions,
|
|
cv.Optional(CONF_DITHER, default="NONE"): cv.one_of(
|
|
"NONE", "FLOYDSTEINBERG", upper=True
|
|
),
|
|
cv.Optional(CONF_INVERT_ALPHA, default=False): cv.boolean,
|
|
cv.GenerateID(CONF_RAW_DATA_ID): cv.declare_id(cg.uint8),
|
|
}
|
|
).add_extra(validate_settings)
|
|
|
|
IMAGE_SCHEMA = BASE_SCHEMA.extend(
|
|
{
|
|
cv.Required(CONF_TYPE): validate_type(IMAGE_TYPE),
|
|
cv.Optional(
|
|
CONF_USE_TRANSPARENCY, default=CONF_OPAQUE
|
|
): validate_transparency(),
|
|
}
|
|
)
|
|
|
|
|
|
def typed_image_schema(image_type):
|
|
"""
|
|
Construct a schema for a specific image type, allowing transparency options
|
|
"""
|
|
return cv.Any(
|
|
cv.Schema(
|
|
{
|
|
cv.Optional(t.lower()): cv.ensure_list(
|
|
BASE_SCHEMA.extend(
|
|
{
|
|
cv.Optional(
|
|
CONF_USE_TRANSPARENCY, default=t
|
|
): validate_transparency((t,)),
|
|
cv.Optional(CONF_TYPE, default=image_type): validate_type(
|
|
(image_type,)
|
|
),
|
|
}
|
|
)
|
|
)
|
|
for t in IMAGE_TYPE[image_type].allow_config.intersection(
|
|
TRANSPARENCY_TYPES
|
|
)
|
|
}
|
|
),
|
|
# Allow a default configuration with no transparency preselected
|
|
cv.ensure_list(
|
|
BASE_SCHEMA.extend(
|
|
{
|
|
cv.Optional(
|
|
CONF_USE_TRANSPARENCY, default=CONF_OPAQUE
|
|
): validate_transparency(),
|
|
cv.Optional(CONF_TYPE, default=image_type): validate_type(
|
|
(image_type,)
|
|
),
|
|
}
|
|
)
|
|
),
|
|
)
|
|
|
|
|
|
# The config schema can be a (possibly empty) single list of images,
|
|
# or a dictionary of image types each with a list of images
|
|
CONFIG_SCHEMA = cv.Any(
|
|
cv.Schema({cv.Optional(t.lower()): typed_image_schema(t) for t in IMAGE_TYPE}),
|
|
cv.ensure_list(IMAGE_SCHEMA),
|
|
)
|
|
|
|
|
|
async def write_image(config, all_frames=False):
|
|
path = Path(config[CONF_FILE])
|
|
if not path.is_file():
|
|
raise core.EsphomeError(f"Could not load image file {path}")
|
|
|
|
resize = config.get(CONF_RESIZE)
|
|
if is_svg_file(path):
|
|
# Local import so use of non-SVG files needn't require cairosvg installed
|
|
from cairosvg import svg2png
|
|
|
|
if not resize:
|
|
resize = (None, None)
|
|
with open(path, "rb") as file:
|
|
image = svg2png(
|
|
file_obj=file,
|
|
output_width=resize[0],
|
|
output_height=resize[1],
|
|
)
|
|
image = Image.open(io.BytesIO(image))
|
|
width, height = image.size
|
|
else:
|
|
image = Image.open(path)
|
|
width, height = image.size
|
|
if resize:
|
|
# Preserve aspect ratio
|
|
new_width_max = min(width, resize[0])
|
|
new_height_max = min(height, resize[1])
|
|
ratio = min(new_width_max / width, new_height_max / height)
|
|
width, height = int(width * ratio), int(height * ratio)
|
|
|
|
if not resize and (width > 500 or height > 500):
|
|
_LOGGER.warning(
|
|
'The image "%s" you requested is very big. Please consider'
|
|
" using the resize parameter.",
|
|
path,
|
|
)
|
|
|
|
dither = (
|
|
Image.Dither.NONE
|
|
if config[CONF_DITHER] == "NONE"
|
|
else Image.Dither.FLOYDSTEINBERG
|
|
)
|
|
type = config[CONF_TYPE]
|
|
transparency = config[CONF_USE_TRANSPARENCY]
|
|
invert_alpha = config[CONF_INVERT_ALPHA]
|
|
frame_count = 1
|
|
if all_frames:
|
|
try:
|
|
frame_count = image.n_frames
|
|
except AttributeError:
|
|
pass
|
|
if frame_count <= 1:
|
|
_LOGGER.warning("Image file %s has no animation frames", path)
|
|
|
|
total_rows = height * frame_count
|
|
encoder = IMAGE_TYPE[type](width, total_rows, transparency, dither, invert_alpha)
|
|
for frame_index in range(frame_count):
|
|
image.seek(frame_index)
|
|
pixels = encoder.convert(image.resize((width, height))).getdata()
|
|
for row in range(height):
|
|
for col in range(width):
|
|
encoder.encode(pixels[row * width + col])
|
|
encoder.end_row()
|
|
|
|
rhs = [HexInt(x) for x in encoder.data]
|
|
prog_arr = cg.progmem_array(config[CONF_RAW_DATA_ID], rhs)
|
|
image_type = get_image_type_enum(type)
|
|
trans_value = get_transparency_enum(transparency)
|
|
|
|
return prog_arr, width, height, image_type, trans_value, frame_count
|
|
|
|
|
|
async def to_code(config):
|
|
if isinstance(config, list):
|
|
for entry in config:
|
|
await to_code(entry)
|
|
elif CONF_ID not in config:
|
|
for entry in config.values():
|
|
await to_code(entry)
|
|
else:
|
|
prog_arr, width, height, image_type, trans_value, _ = await write_image(config)
|
|
cg.new_Pvariable(
|
|
config[CONF_ID], prog_arr, width, height, image_type, trans_value
|
|
)
|