Initial project structure: reusable isometric bot engine with D2R implementation
This commit is contained in:
commit
e0282a7111
44 changed files with 3433 additions and 0 deletions
220
engine/screen/capture.py
Normal file
220
engine/screen/capture.py
Normal file
|
|
@ -0,0 +1,220 @@
|
|||
"""Screen capture utilities for taking game screenshots.
|
||||
|
||||
Provides efficient screenshot capture using multiple backends (mss, PIL)
|
||||
with support for specific regions and window targeting.
|
||||
"""
|
||||
|
||||
from typing import Tuple, Optional, Dict, Any
|
||||
from dataclasses import dataclass
|
||||
import time
|
||||
import logging
|
||||
|
||||
import numpy as np
|
||||
from PIL import Image, ImageGrab
|
||||
import mss
|
||||
import cv2
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScreenRegion:
|
||||
"""Defines a rectangular region of the screen to capture."""
|
||||
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
|
||||
@property
|
||||
def bounds(self) -> Tuple[int, int, int, int]:
|
||||
"""Return region as (left, top, right, bottom) tuple."""
|
||||
return (self.x, self.y, self.x + self.width, self.y + self.height)
|
||||
|
||||
@property
|
||||
def mss_bounds(self) -> Dict[str, int]:
|
||||
"""Return region in MSS format."""
|
||||
return {
|
||||
"top": self.y,
|
||||
"left": self.x,
|
||||
"width": self.width,
|
||||
"height": self.height,
|
||||
}
|
||||
|
||||
|
||||
class ScreenCapture:
|
||||
"""High-performance screen capture with multiple backends."""
|
||||
|
||||
def __init__(self, backend: str = "mss", monitor: int = 1):
|
||||
"""Initialize screen capture.
|
||||
|
||||
Args:
|
||||
backend: Capture backend ("mss" or "pil")
|
||||
monitor: Monitor number to capture from (1-indexed)
|
||||
"""
|
||||
self.backend = backend
|
||||
self.monitor = monitor
|
||||
self._mss_instance: Optional[mss.mss] = None
|
||||
self._monitor_info: Optional[Dict[str, int]] = None
|
||||
|
||||
if backend == "mss":
|
||||
self._initialize_mss()
|
||||
|
||||
def _initialize_mss(self) -> None:
|
||||
"""Initialize MSS backend."""
|
||||
try:
|
||||
self._mss_instance = mss.mss()
|
||||
monitors = self._mss_instance.monitors
|
||||
|
||||
if self.monitor >= len(monitors):
|
||||
logger.warning(f"Monitor {self.monitor} not found, using primary")
|
||||
self.monitor = 1
|
||||
|
||||
self._monitor_info = monitors[self.monitor]
|
||||
logger.info(f"Initialized MSS capture for monitor {self.monitor}: "
|
||||
f"{self._monitor_info['width']}x{self._monitor_info['height']}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize MSS: {e}")
|
||||
self.backend = "pil"
|
||||
|
||||
def capture_screen(self, region: Optional[ScreenRegion] = None) -> np.ndarray:
|
||||
"""Capture screenshot of screen or region.
|
||||
|
||||
Args:
|
||||
region: Specific region to capture, or None for full screen
|
||||
|
||||
Returns:
|
||||
Screenshot as numpy array in BGR format (for OpenCV compatibility)
|
||||
"""
|
||||
try:
|
||||
if self.backend == "mss":
|
||||
return self._capture_mss(region)
|
||||
else:
|
||||
return self._capture_pil(region)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Screen capture failed: {e}")
|
||||
# Fallback to empty image
|
||||
return np.zeros((100, 100, 3), dtype=np.uint8)
|
||||
|
||||
def _capture_mss(self, region: Optional[ScreenRegion]) -> np.ndarray:
|
||||
"""Capture using MSS backend."""
|
||||
if not self._mss_instance:
|
||||
raise RuntimeError("MSS not initialized")
|
||||
|
||||
if region:
|
||||
monitor = region.mss_bounds
|
||||
else:
|
||||
monitor = self._monitor_info or self._mss_instance.monitors[self.monitor]
|
||||
|
||||
# MSS returns BGRA format
|
||||
screenshot = self._mss_instance.grab(monitor)
|
||||
img_array = np.frombuffer(screenshot.rgb, dtype=np.uint8)
|
||||
img_array = img_array.reshape((screenshot.height, screenshot.width, 3))
|
||||
|
||||
# Convert RGB to BGR for OpenCV
|
||||
return cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
|
||||
|
||||
def _capture_pil(self, region: Optional[ScreenRegion]) -> np.ndarray:
|
||||
"""Capture using PIL backend."""
|
||||
if region:
|
||||
bbox = region.bounds
|
||||
else:
|
||||
bbox = None
|
||||
|
||||
# PIL returns RGB format
|
||||
screenshot = ImageGrab.grab(bbox=bbox)
|
||||
img_array = np.array(screenshot)
|
||||
|
||||
# Convert RGB to BGR for OpenCV
|
||||
return cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
|
||||
|
||||
def save_screenshot(self, filename: str, region: Optional[ScreenRegion] = None) -> bool:
|
||||
"""Save screenshot to file.
|
||||
|
||||
Args:
|
||||
filename: Output filename
|
||||
region: Region to capture, or None for full screen
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
img = self.capture_screen(region)
|
||||
return cv2.imwrite(filename, img)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save screenshot: {e}")
|
||||
return False
|
||||
|
||||
def get_screen_size(self) -> Tuple[int, int]:
|
||||
"""Get screen dimensions.
|
||||
|
||||
Returns:
|
||||
(width, height) tuple
|
||||
"""
|
||||
if self.backend == "mss" and self._monitor_info:
|
||||
return (self._monitor_info["width"], self._monitor_info["height"])
|
||||
else:
|
||||
# Use PIL as fallback
|
||||
screenshot = ImageGrab.grab()
|
||||
return screenshot.size
|
||||
|
||||
def find_window(self, window_title: str) -> Optional[ScreenRegion]:
|
||||
"""Find window by title and return its region.
|
||||
|
||||
Args:
|
||||
window_title: Partial or full window title to search for
|
||||
|
||||
Returns:
|
||||
ScreenRegion if window found, None otherwise
|
||||
|
||||
Note:
|
||||
This is a placeholder - actual implementation would use
|
||||
platform-specific window enumeration (e.g., Windows API, X11)
|
||||
"""
|
||||
# TODO: Implement window finding
|
||||
logger.warning("Window finding not implemented yet")
|
||||
return None
|
||||
|
||||
def benchmark_capture(self, iterations: int = 100) -> Dict[str, float]:
|
||||
"""Benchmark capture performance.
|
||||
|
||||
Args:
|
||||
iterations: Number of captures to perform
|
||||
|
||||
Returns:
|
||||
Performance statistics
|
||||
"""
|
||||
logger.info(f"Benchmarking {self.backend} backend ({iterations} iterations)")
|
||||
|
||||
start_time = time.perf_counter()
|
||||
|
||||
for _ in range(iterations):
|
||||
self.capture_screen()
|
||||
|
||||
end_time = time.perf_counter()
|
||||
total_time = end_time - start_time
|
||||
avg_time = total_time / iterations
|
||||
fps = iterations / total_time
|
||||
|
||||
stats = {
|
||||
"backend": self.backend,
|
||||
"iterations": iterations,
|
||||
"total_time": total_time,
|
||||
"avg_time_ms": avg_time * 1000,
|
||||
"fps": fps,
|
||||
}
|
||||
|
||||
logger.info(f"Benchmark results: {avg_time*1000:.2f}ms avg, {fps:.1f} FPS")
|
||||
return stats
|
||||
|
||||
def __enter__(self):
|
||||
"""Context manager entry."""
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Context manager exit."""
|
||||
if self._mss_instance:
|
||||
self._mss_instance.close()
|
||||
Loading…
Add table
Add a link
Reference in a new issue