diff --git a/esphome/components/esp32/__init__.py b/esphome/components/esp32/__init__.py
index cda02f80c2..9b83d144f8 100644
--- a/esphome/components/esp32/__init__.py
+++ b/esphome/components/esp32/__init__.py
@@ -386,10 +386,21 @@ FRAMEWORK_SCHEMA = cv.typed_schema(
 )
 
 
+FLASH_SIZES = [
+    "4MB",
+    "8MB",
+    "16MB",
+    "32MB",
+]
+
+CONF_FLASH_SIZE = "flash_size"
 CONFIG_SCHEMA = cv.All(
     cv.Schema(
         {
             cv.Required(CONF_BOARD): cv.string_strict,
+            cv.Optional(CONF_FLASH_SIZE, default="4MB"): cv.one_of(
+                *FLASH_SIZES, upper=True
+            ),
             cv.Optional(CONF_VARIANT): cv.one_of(*VARIANTS, upper=True),
             cv.Optional(CONF_FRAMEWORK, default={}): FRAMEWORK_SCHEMA,
         }
@@ -401,6 +412,7 @@ CONFIG_SCHEMA = cv.All(
 
 async def to_code(config):
     cg.add_platformio_option("board", config[CONF_BOARD])
+    cg.add_platformio_option("board_upload.flash_size", config[CONF_FLASH_SIZE])
     cg.add_build_flag("-DUSE_ESP32")
     cg.add_define("ESPHOME_BOARD", config[CONF_BOARD])
     cg.add_build_flag(f"-DUSE_ESP32_VARIANT_{config[CONF_VARIANT]}")
@@ -505,24 +517,46 @@ async def to_code(config):
         )
 
 
-ARDUINO_PARTITIONS_CSV = """\
-nvs,      data, nvs,     0x009000, 0x005000,
-otadata,  data, ota,     0x00e000, 0x002000,
-app0,     app,  ota_0,   0x010000, 0x1C0000,
-app1,     app,  ota_1,   0x1D0000, 0x1C0000,
-eeprom,   data, 0x99,    0x390000, 0x001000,
-spiffs,   data, spiffs,  0x391000, 0x00F000
+APP_PARTITION_SIZES = {
+    "4MB": 0x1C0000,  # 1792 KB
+    "8MB": 0x3C0000,  # 3840 KB
+    "16MB": 0x7C0000,  # 7936 KB
+    "32MB": 0xFC0000,  # 16128 KB
+}
+
+
+def get_arduino_partition_csv(flash_size):
+    app_partition_size = APP_PARTITION_SIZES[flash_size]
+    eeprom_partition_size = 0x1000  # 4 KB
+    spiffs_partition_size = 0xF000  # 60 KB
+
+    app0_partition_start = 0x010000  # 64 KB
+    app1_partition_start = app0_partition_start + app_partition_size
+    eeprom_partition_start = app1_partition_start + app_partition_size
+    spiffs_partition_start = eeprom_partition_start + eeprom_partition_size
+
+    partition_csv = f"""\
+nvs,      data, nvs,     0x9000, 0x5000,
+otadata,  data, ota,     0xE000, 0x2000,
+app0,     app,  ota_0,   0x{app0_partition_start:X}, 0x{app_partition_size:X},
+app1,     app,  ota_1,   0x{app1_partition_start:X}, 0x{app_partition_size:X},
+eeprom,   data, 0x99,    0x{eeprom_partition_start:X}, 0x{eeprom_partition_size:X},
+spiffs,   data, spiffs,  0x{spiffs_partition_start:X}, 0x{spiffs_partition_size:X}
 """
+    return partition_csv
 
 
-IDF_PARTITIONS_CSV = """\
-# Name,   Type, SubType, Offset,   Size, Flags
+def get_idf_partition_csv(flash_size):
+    app_partition_size = APP_PARTITION_SIZES[flash_size]
+
+    partition_csv = f"""\
 otadata,  data, ota,     ,        0x2000,
 phy_init, data, phy,     ,        0x1000,
-app0,     app,  ota_0,   ,      0x1C0000,
-app1,     app,  ota_1,   ,      0x1C0000,
-nvs,      data, nvs,     ,       0x6d000,
+app0,     app,  ota_0,   ,        0x{app_partition_size:X},
+app1,     app,  ota_1,   ,        0x{app_partition_size:X},
+nvs,      data, nvs,     ,        0x6D000,
 """
+    return partition_csv
 
 
 def _format_sdkconfig_val(value: SdkconfigValueType) -> str:
@@ -565,13 +599,17 @@ def copy_files():
     if CORE.using_arduino:
         write_file_if_changed(
             CORE.relative_build_path("partitions.csv"),
-            ARDUINO_PARTITIONS_CSV,
+            get_arduino_partition_csv(
+                CORE.platformio_options.get("board_upload.flash_size")
+            ),
         )
     if CORE.using_esp_idf:
         _write_sdkconfig()
         write_file_if_changed(
             CORE.relative_build_path("partitions.csv"),
-            IDF_PARTITIONS_CSV,
+            get_idf_partition_csv(
+                CORE.platformio_options.get("board_upload.flash_size")
+            ),
         )
         # IDF build scripts look for version string to put in the build.
         # However, if the build path does not have an initialized git repo,
diff --git a/esphome/const.py b/esphome/const.py
index ddba48921e..dde5f23f6f 100644
--- a/esphome/const.py
+++ b/esphome/const.py
@@ -1,6 +1,6 @@
 """Constants used by esphome."""
 
-__version__ = "2023.11.0b5"
+__version__ = "2023.11.0b6"
 
 ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
 VALID_SUBSTITUTIONS_CHARACTERS = (
diff --git a/esphome/dashboard/dashboard.py b/esphome/dashboard/dashboard.py
index 5967c95aba..050564d21e 100644
--- a/esphome/dashboard/dashboard.py
+++ b/esphome/dashboard/dashboard.py
@@ -1,8 +1,8 @@
 from __future__ import annotations
 
+import asyncio
 import base64
 import binascii
-import codecs
 import collections
 import datetime
 import functools
@@ -339,8 +339,8 @@ class EsphomeCommandWebSocket(tornado.websocket.WebSocketHandler):
     def handle_stdin(self, json_message):
         if not self.is_process_active:
             return
-        data = json_message["data"]
-        data = codecs.encode(data, "utf8", "replace")
+        text: str = json_message["data"]
+        data = text.encode("utf-8", "replace")
         _LOGGER.debug("< stdin: %s", data)
         self._proc.stdin.write(data)
 
@@ -351,18 +351,18 @@ class EsphomeCommandWebSocket(tornado.websocket.WebSocketHandler):
         while True:
             try:
                 if self._use_popen:
-                    data = yield self._queue.get()
+                    data: bytes = yield self._queue.get()
                     if data is None:
                         self._proc_on_exit(self._proc.poll())
                         break
                 else:
-                    data = yield self._proc.stdout.read_until_regex(reg)
+                    data: bytes = yield self._proc.stdout.read_until_regex(reg)
             except tornado.iostream.StreamClosedError:
                 break
-            data = codecs.decode(data, "utf8", "replace")
 
-            _LOGGER.debug("> stdout: %s", data)
-            self.write_message({"event": "line", "data": data})
+            text = data.decode("utf-8", "replace")
+            _LOGGER.debug("> stdout: %s", text)
+            self.write_message({"event": "line", "data": text})
 
     def _stdout_thread(self):
         if not self._use_popen:
@@ -509,8 +509,8 @@ class EsphomeUpdateAllHandler(EsphomeCommandWebSocket):
 
 class SerialPortRequestHandler(BaseHandler):
     @authenticated
-    def get(self):
-        ports = get_serial_ports()
+    async def get(self):
+        ports = await asyncio.get_running_loop().run_in_executor(None, get_serial_ports)
         data = []
         for port in ports:
             desc = port.description