mirror of
https://github.com/esphome/esphome.git
synced 2025-09-22 05:02:23 +01:00
cleanup
This commit is contained in:
@@ -6,16 +6,21 @@ import gzip
|
|||||||
import hashlib
|
import hashlib
|
||||||
import io
|
import io
|
||||||
import socket
|
import socket
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
from unittest.mock import MagicMock, Mock, call, patch
|
from unittest.mock import MagicMock, Mock, call, patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from pytest import CaptureFixture
|
||||||
|
|
||||||
from esphome import espota2
|
from esphome import espota2
|
||||||
from esphome.core import EsphomeError
|
from esphome.core import EsphomeError
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from unittest.mock import Mock as MockType
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def mock_socket():
|
def mock_socket() -> MockType:
|
||||||
"""Create a mock socket for testing."""
|
"""Create a mock socket for testing."""
|
||||||
socket = Mock()
|
socket = Mock()
|
||||||
socket.close = Mock()
|
socket.close = Mock()
|
||||||
@@ -421,40 +426,38 @@ def test_run_ota_wrapper() -> None:
|
|||||||
assert result == (1, None)
|
assert result == (1, None)
|
||||||
|
|
||||||
|
|
||||||
def test_progress_bar() -> None:
|
def test_progress_bar(capsys: CaptureFixture[str]) -> None:
|
||||||
"""Test ProgressBar functionality."""
|
"""Test ProgressBar functionality."""
|
||||||
with (
|
progress = espota2.ProgressBar()
|
||||||
patch("sys.stderr.write") as mock_write,
|
|
||||||
patch("sys.stderr.flush"),
|
|
||||||
):
|
|
||||||
progress = espota2.ProgressBar()
|
|
||||||
|
|
||||||
# Test initial update
|
# Test initial update
|
||||||
progress.update(0.0)
|
progress.update(0.0)
|
||||||
assert mock_write.called
|
captured = capsys.readouterr()
|
||||||
assert "0%" in mock_write.call_args[0][0]
|
assert "0%" in captured.err
|
||||||
|
assert "[" in captured.err
|
||||||
|
|
||||||
# Test progress update
|
# Test progress update
|
||||||
mock_write.reset_mock()
|
progress.update(0.5)
|
||||||
progress.update(0.5)
|
captured = capsys.readouterr()
|
||||||
assert "50%" in mock_write.call_args[0][0]
|
assert "50%" in captured.err
|
||||||
|
|
||||||
# Test completion
|
# Test completion
|
||||||
mock_write.reset_mock()
|
progress.update(1.0)
|
||||||
progress.update(1.0)
|
captured = capsys.readouterr()
|
||||||
assert "100%" in mock_write.call_args[0][0]
|
assert "100%" in captured.err
|
||||||
assert "Done" in mock_write.call_args[0][0]
|
assert "Done" in captured.err
|
||||||
|
|
||||||
# Test done method
|
# Test done method
|
||||||
mock_write.reset_mock()
|
progress.done()
|
||||||
progress.done()
|
captured = capsys.readouterr()
|
||||||
assert mock_write.call_args[0][0] == "\n"
|
assert captured.err == "\n"
|
||||||
|
|
||||||
# Test same progress doesn't update
|
# Test same progress doesn't update
|
||||||
mock_write.reset_mock()
|
progress.update(0.5)
|
||||||
progress.update(0.5)
|
progress.update(0.5)
|
||||||
progress.update(0.5)
|
captured = capsys.readouterr()
|
||||||
assert mock_write.call_count == 1 # Only called once
|
# Should only see one update (second call shouldn't write)
|
||||||
|
assert captured.err.count("50%") == 1
|
||||||
|
|
||||||
|
|
||||||
# Tests for SHA256 authentication (for when PR is merged)
|
# Tests for SHA256 authentication (for when PR is merged)
|
||||||
|
Reference in New Issue
Block a user