Rewrite to Go: engine, plugin system, D2R plugin, API, loot filter

This commit is contained in:
Hoid 2026-02-14 09:43:39 +00:00
parent e0282a7111
commit 3b363192f2
60 changed files with 1576 additions and 3407 deletions

33
.gitignore vendored
View file

@ -1,17 +1,30 @@
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
.eggs/
*.egg
.venv/
venv/
env/
# Binaries
iso-bot
*.exe
*.dll
*.so
*.dylib
# Build
/bin/
/dist/
# Config
config/local.yaml
# Logs
logs/
*.log
# IDE
.idea/
.vscode/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Go
vendor/

169
README.md
View file

@ -1,132 +1,75 @@
# ISO Bot - Isometric Game Bot Engine
# iso-bot — Isometric Game Bot Engine
A reusable bot engine designed for isometric games, starting with Diablo II: Resurrected. Built on screen-reading and human-like input simulation principles.
A modular, high-performance bot framework for isometric games. Screen-reading only — no memory injection, no hooking, no client modification.
## Project Goals
First game: **Diablo II: Resurrected**
- **Reusable Architecture**: Core engine that can be adapted to different isometric games
- **Screen-Reading Approach**: No memory injection or game modification - purely visual recognition
- **Human-like Behavior**: Realistic mouse movement, timing patterns, and randomization
- **Modular Design**: Clean separation between engine components and game-specific implementations
- **Safety First**: Built-in anti-detection measures and break scheduling
## Architecture
## Architecture Overview
```
cmd/iso-bot/ Entry point (single binary)
pkg/
├── engine/
│ ├── capture/ Screen capture (window, VM, monitor)
│ ├── vision/ Template matching, color detection (GoCV)
│ ├── input/ Mouse (Bézier curves), keyboard, humanization
│ ├── state/ Game state machine with callbacks
│ ├── safety/ Session timing, breaks, pattern randomization
│ ├── navigation/ Pathfinding, click-to-move
│ └── loot/ Declarative rule-based loot filter engine
├── plugin/ Game plugin interface (implement to add a game)
├── api/ REST + WebSocket API for dashboard
└── auth/ License/account validation
plugins/
└── d2r/ Diablo II: Resurrected plugin
├── screens/ State detection (menu, in-game, inventory)
├── routines/ Farming routines (Mephisto, Pindle, Countess)
├── loot/ Default loot filter rules (YAML)
└── templates/ UI template images
web/ React dashboard (future)
config/ YAML configuration
```
### Core Engine (`engine/`)
## Design Principles
The reusable engine provides fundamental bot capabilities:
- **Screen Module**: Screenshot capture, OCR text extraction, template matching
- **Input Module**: Human-like mouse movement with Bézier curves, keyboard input with realistic timing
- **Vision Module**: Computer vision utilities for object detection and color analysis
- **State Module**: Game state management with event system
- **Navigation Module**: Pathfinding algorithms and movement control
- **Safety Module**: Anti-detection measures including timing randomization and break scheduling
### Game Implementations (`games/`)
Game-specific implementations inherit from the core engine:
- **D2R Module**: Diablo II: Resurrected implementation with screen detection, bot routines, and UI templates
### Supporting Components
- **UI Module**: Web dashboard for monitoring and control
- **Config Module**: YAML-based configuration system
- **Tests Module**: Unit tests for engine components
## Technical Approach
### Screen Reading Only
This bot uses **no memory injection or game modification**. All game state detection relies on:
- Screenshot analysis using OpenCV
- OCR text extraction with pytesseract
- Template matching for UI elements
- Color-based object detection
### Human-like Input
All input simulation mimics human behavior:
- **Mouse Movement**: Bézier curves with natural acceleration/deceleration
- **Timing Patterns**: Randomized delays based on realistic human response times
- **Break Scheduling**: Regular breaks with varying durations
- **Behavioral Randomization**: Varied click positions, movement patterns, and reaction times
- **Plugin-based**: Engine is game-agnostic. All game logic in plugins implementing `plugin.Plugin`
- **Screen reading only**: Captures screenshots, analyzes pixels/templates, sends inputs
- **Human-like**: Bézier mouse movement, randomized delays, fatigue, scheduled breaks
- **VM-safe**: Engine runs on host, captures VM window — game never sees the bot
- **Declarative loot**: YAML rule engine for item filtering
- **API-first**: REST + WebSocket for remote dashboard control
## Adding a New Game
1. Create directory under `games/your_game/`
2. Inherit from `engine.state.manager.GameStateManager`
3. Implement screen detection classes in `screens/`
4. Define bot routines in `routines/`
5. Add UI templates for visual recognition
6. Configure game-specific settings
Implement these interfaces from `pkg/plugin`:
- `GameDetector` — detect game state from screenshots
- `ScreenReader` — extract items, enemies, text from screen
- `Routine` — automated farming sequences
- `LootFilter` — item pickup rules
## Diablo II: Resurrected Implementation
The D2R implementation includes:
- **Screen Detection**: Main menu, character select, in-game state, inventory management
- **Bot Routines**: Mephisto runs, Pindleskin runs, Countess runs
- **Template Matching**: UI elements, items, monsters
- **Configuration**: D2R-specific timing, coordinates, and behavior settings
## Installation & Setup
## Usage
```bash
# Install dependencies
pip install -r requirements.txt
# Build
go build -o iso-bot ./cmd/iso-bot
# Configure settings
cp config/default.yaml config/local.yaml
# Edit local.yaml with your settings
# Run bot
python -m games.d2r.game
# Run
./iso-bot --game d2r --routine mephisto --api :8080
```
## Development
## Tech Stack
- **Python 3.11+** required
- **Code Style**: Black formatting, type hints required
- **Testing**: pytest for unit tests
- **Git Workflow**: Feature branches, PR reviews
## Safety & Responsibility
This bot is intended for educational and research purposes. Users are responsible for:
- Compliance with game terms of service
- Ethical use of automation
- Respect for other players
- Following applicable laws and regulations
## Architecture Decisions
- **Screen-reading only**: Maintains game integrity, reduces detection risk
- **Human-like input**: Natural behavior patterns for safety
- **Modular design**: Enables reuse across different games
- **Configuration-driven**: Easy customization without code changes
- **Event-driven state management**: Responsive to changing game conditions
## Current Status
✅ Initial project structure created
⏳ Engine implementation in progress
⏳ D2R game implementation pending
⏳ Web dashboard pending
⏳ Testing framework pending
## Contributing
1. Fork the repository
2. Create feature branch (`git checkout -b feature/amazing-feature`)
3. Commit changes (`git commit -m 'Add amazing feature'`)
4. Push to branch (`git push origin feature/amazing-feature`)
5. Open Pull Request
| Component | Technology |
|-----------|-----------|
| Engine | Go 1.23+ |
| Vision | GoCV (OpenCV bindings) |
| Screen capture | Platform-native (Win32 / X11) |
| Input simulation | Platform-native (SendInput / uinput) |
| API | net/http + gorilla/websocket |
| Dashboard | React + TypeScript (planned) |
| Config | YAML |
| Loot filter | Declarative YAML rules |
## License
This project is for educational purposes. Please respect game terms of service and applicable laws.
Private — proprietary software.

View file

@ -1,20 +0,0 @@
resolution: [1920, 1080]
loot:
pickup_uniques: true
pickup_sets: true
pickup_rares: true
pickup_runes: true
min_rune_tier: 10 # Lem and above
pickup_gems: false
safety:
health_potion_threshold: 0.5
mana_potion_threshold: 0.3
chicken_threshold: 0.2
character:
class: sorceress
has_teleport: true
primary_skill: blizzard
secondary_skill: glacial_spike

View file

@ -1,26 +0,0 @@
engine:
screen:
backend: mss
monitor: 1
humanizer:
reaction_min: 0.15
reaction_max: 0.45
mouse_speed_min: 400
mouse_speed_max: 1200
click_jitter: 3
hesitation_chance: 0.1
safety:
session_min_hours: 1.0
session_max_hours: 4.0
break_min_minutes: 10
break_max_minutes: 45
max_daily_hours: 12
game: d2r
routine: mephisto
logging:
level: INFO
file: logs/bot.log

View file

@ -1,35 +0,0 @@
"""ISO Bot Engine - Core reusable components for isometric game bots.
This module provides the fundamental building blocks for creating bots that work with
isometric games through screen reading and human-like input simulation.
The engine is designed to be game-agnostic, with game-specific implementations
built on top of these core components.
Main Components:
- screen: Screenshot capture, OCR, template matching
- input: Human-like mouse/keyboard input simulation
- vision: Computer vision utilities for object detection
- state: Game state management with event system
- navigation: Pathfinding and movement control
- safety: Anti-detection measures and timing randomization
"""
__version__ = "0.1.0"
__author__ = "Hoid"
from .screen.capture import ScreenCapture
from .input.humanize import HumanInput
from .state.manager import GameStateManager
from .vision.detector import ObjectDetector
from .navigation.pathfinder import Pathfinder
from .safety.timing import SafetyTimer
__all__ = [
"ScreenCapture",
"HumanInput",
"GameStateManager",
"ObjectDetector",
"Pathfinder",
"SafetyTimer",
]

View file

@ -1,23 +0,0 @@
"""Human-like input simulation for mouse and keyboard interactions.
This module provides tools for generating realistic input patterns that mimic
human behavior, including natural mouse movement curves and timing variations.
Components:
- mouse: Human-like mouse movement with Bézier curves
- keyboard: Keyboard input with realistic timing patterns
- humanize: Central controller for randomized, human-like interactions
"""
from .mouse import MouseController, MousePath
from .keyboard import KeyboardController, KeySequence
from .humanize import HumanInput, InputConfig
__all__ = [
"MouseController",
"MousePath",
"KeyboardController",
"KeySequence",
"HumanInput",
"InputConfig",
]

View file

@ -1,112 +0,0 @@
"""Human-like behavior patterns for input simulation.
Provides randomization utilities to make bot inputs appear natural,
including variable delays, mouse jitter, and activity scheduling.
"""
import random
import time
import logging
from typing import Tuple, Optional
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class HumanProfile:
"""Defines a human behavior profile for input randomization."""
# Reaction time range in seconds
reaction_min: float = 0.15
reaction_max: float = 0.45
# Mouse movement speed range (pixels per second)
mouse_speed_min: float = 400.0
mouse_speed_max: float = 1200.0
# Click position jitter in pixels
click_jitter: int = 3
# Chance of double-reading (hesitation before action)
hesitation_chance: float = 0.1
hesitation_duration: Tuple[float, float] = (0.3, 1.2)
# Break scheduling
micro_break_interval: Tuple[int, int] = (120, 300) # seconds
micro_break_duration: Tuple[int, int] = (2, 8) # seconds
long_break_interval: Tuple[int, int] = (1800, 3600) # seconds
long_break_duration: Tuple[int, int] = (60, 300) # seconds
class Humanizer:
"""Applies human-like randomization to bot actions."""
def __init__(self, profile: Optional[HumanProfile] = None):
self.profile = profile or HumanProfile()
self._last_micro_break = time.time()
self._last_long_break = time.time()
self._next_micro_break = self._schedule_break(self.profile.micro_break_interval)
self._next_long_break = self._schedule_break(self.profile.long_break_interval)
self._action_count = 0
def reaction_delay(self) -> float:
"""Generate a human-like reaction delay."""
base = random.uniform(self.profile.reaction_min, self.profile.reaction_max)
# Occasionally add hesitation
if random.random() < self.profile.hesitation_chance:
base += random.uniform(*self.profile.hesitation_duration)
# Slight fatigue factor based on actions performed
fatigue = min(self._action_count / 1000, 0.3)
base *= (1 + fatigue * random.random())
return base
def jitter_position(self, x: int, y: int) -> Tuple[int, int]:
"""Add small random offset to click position."""
jitter = self.profile.click_jitter
return (
x + random.randint(-jitter, jitter),
y + random.randint(-jitter, jitter),
)
def mouse_speed(self) -> float:
"""Get randomized mouse movement speed."""
return random.uniform(
self.profile.mouse_speed_min,
self.profile.mouse_speed_max,
)
def should_take_break(self) -> Optional[float]:
"""Check if it's time for a break. Returns break duration or None."""
now = time.time()
if now >= self._next_long_break:
duration = random.uniform(*self.profile.long_break_duration)
self._next_long_break = now + duration + self._schedule_break(
self.profile.long_break_interval
)
logger.info(f"Long break: {duration:.0f}s")
return duration
if now >= self._next_micro_break:
duration = random.uniform(*self.profile.micro_break_duration)
self._next_micro_break = now + duration + self._schedule_break(
self.profile.micro_break_interval
)
logger.debug(f"Micro break: {duration:.1f}s")
return duration
return None
def wait(self) -> None:
"""Wait for a human-like reaction delay."""
delay = self.reaction_delay()
time.sleep(delay)
self._action_count += 1
def _schedule_break(self, interval: Tuple[int, int]) -> float:
"""Schedule next break with randomized interval."""
return time.time() + random.uniform(*interval)

View file

@ -1,368 +0,0 @@
"""Human-like keyboard input simulation with realistic timing.
Provides keyboard input with natural typing patterns, including
varied keystroke timing and realistic human typing characteristics.
"""
from typing import List, Dict, Optional, Union
from dataclasses import dataclass
import time
import random
import logging
import pyautogui
from pynput import keyboard
logger = logging.getLogger(__name__)
@dataclass
class KeySequence:
"""Represents a sequence of keys with timing information."""
keys: List[str]
delays: List[float]
total_duration: float
class TypingProfile:
"""Defines typing characteristics for human-like input."""
def __init__(self, wpm: int = 60, accuracy: float = 0.95):
"""Initialize typing profile.
Args:
wpm: Words per minute typing speed
accuracy: Typing accuracy (0.0 to 1.0)
"""
self.wpm = wpm
self.accuracy = accuracy
# Calculate base timing from WPM (assuming 5 characters per word)
chars_per_minute = wpm * 5
self.base_char_delay = 60.0 / chars_per_minute
# Timing variations
self.min_delay = 0.02 # Minimum delay between keys
self.max_delay = 0.5 # Maximum delay between keys
self.word_pause = 0.1 # Additional pause after spaces
# Common typing patterns
self.difficult_sequences = {
'th', 'ch', 'sh', 'qu', 'tion', 'ing', 'er', 'ed'
}
# Keys that typically take longer to find
self.slow_keys = {
'q', 'z', 'x', 'j', 'k', 'shift', 'ctrl', 'alt'
}
class KeyboardController:
"""Controller for human-like keyboard input."""
def __init__(self, typing_profile: Optional[TypingProfile] = None):
"""Initialize keyboard controller.
Args:
typing_profile: Typing characteristics, or None for default
"""
self.profile = typing_profile or TypingProfile()
self.shift_held = False
self.ctrl_held = False
self.alt_held = False
# Key mapping for special keys
self.key_mapping = {
'enter': '\n',
'return': '\n',
'tab': '\t',
'space': ' ',
'backspace': 'backspace',
'delete': 'delete',
'escape': 'esc',
'esc': 'esc',
}
def calculate_key_delay(self, key: str, previous_key: Optional[str] = None) -> float:
"""Calculate realistic delay for typing a key.
Args:
key: Key to type
previous_key: Previously typed key for sequence analysis
Returns:
Delay in seconds before typing this key
"""
# Base delay from typing speed
delay = self.profile.base_char_delay
# Adjust for key difficulty
if key.lower() in self.profile.slow_keys:
delay *= random.uniform(1.2, 1.8)
# Adjust for difficult sequences
if previous_key:
sequence = previous_key.lower() + key.lower()
if any(seq in sequence for seq in self.profile.difficult_sequences):
delay *= random.uniform(1.1, 1.5)
# Add natural variation
delay *= random.uniform(0.7, 1.4)
# Extra pause after spaces (word boundaries)
if previous_key and previous_key == ' ':
delay += self.profile.word_pause * random.uniform(0.5, 1.5)
# Clamp to reasonable bounds
return max(self.profile.min_delay, min(self.profile.max_delay, delay))
def type_text(self, text: str, include_errors: bool = False) -> None:
"""Type text with human-like timing and optional errors.
Args:
text: Text to type
include_errors: Whether to include typing errors and corrections
"""
if not text:
return
logger.debug(f"Typing text: '{text[:50]}{'...' if len(text) > 50 else ''}'")
previous_key = None
for i, char in enumerate(text):
# Calculate delay before this character
delay = self.calculate_key_delay(char, previous_key)
# Sleep before typing
time.sleep(delay)
# Occasionally make typing errors if enabled
if include_errors and self.should_make_error():
self.make_typing_error(char)
else:
self.type_key(char)
previous_key = char
def should_make_error(self) -> bool:
"""Determine if a typing error should be made.
Returns:
True if an error should be made
"""
return random.random() > self.profile.accuracy
def make_typing_error(self, intended_key: str) -> None:
"""Make a typing error and correct it.
Args:
intended_key: The key that was supposed to be typed
"""
# Type wrong key (usually adjacent on keyboard)
wrong_key = self.get_adjacent_key(intended_key)
self.type_key(wrong_key)
# Pause as human realizes mistake
time.sleep(random.uniform(0.1, 0.4))
# Backspace to correct
self.type_key('backspace')
time.sleep(random.uniform(0.05, 0.15))
# Type correct key
self.type_key(intended_key)
def get_adjacent_key(self, key: str) -> str:
"""Get an adjacent key for typing errors.
Args:
key: Original key
Returns:
Adjacent key that could be mistyped
"""
# Simplified adjacent key mapping
adjacent_map = {
'a': 'sq', 'b': 'vgn', 'c': 'xvd', 'd': 'sfe', 'e': 'wrd',
'f': 'dgr', 'g': 'fht', 'h': 'gyu', 'i': 'uko', 'j': 'hnu',
'k': 'jmo', 'l': 'kpo', 'm': 'njk', 'n': 'bhm', 'o': 'ilp',
'p': 'olo', 'q': 'wa', 'r': 'etf', 's': 'adw', 't': 'rgy',
'u': 'yhi', 'v': 'cfg', 'w': 'qse', 'x': 'zdc', 'y': 'tgu',
'z': 'xas'
}
adjacent_keys = adjacent_map.get(key.lower(), 'abcd')
return random.choice(adjacent_keys)
def type_key(self, key: str) -> None:
"""Type a single key.
Args:
key: Key to type
"""
# Handle special keys
if key.lower() in self.key_mapping:
mapped_key = self.key_mapping[key.lower()]
if mapped_key in ['backspace', 'delete', 'esc']:
pyautogui.press(mapped_key)
else:
pyautogui.write(mapped_key)
else:
pyautogui.write(key)
def press_key_combination(self, *keys: str) -> None:
"""Press a combination of keys (e.g., Ctrl+C).
Args:
keys: Keys to press together
"""
logger.debug(f"Pressing key combination: {'+'.join(keys)}")
# Press all keys down
for key in keys:
pyautogui.keyDown(key)
time.sleep(random.uniform(0.01, 0.03))
# Hold briefly
time.sleep(random.uniform(0.05, 0.1))
# Release all keys (in reverse order)
for key in reversed(keys):
pyautogui.keyUp(key)
time.sleep(random.uniform(0.01, 0.03))
def press_key(self, key: str, duration: Optional[float] = None) -> None:
"""Press and release a key.
Args:
key: Key to press
duration: How long to hold key, or None for quick press
"""
if duration is None:
pyautogui.press(key)
else:
pyautogui.keyDown(key)
time.sleep(duration)
pyautogui.keyUp(key)
def hold_key(self, key: str) -> None:
"""Start holding a key down.
Args:
key: Key to hold
"""
pyautogui.keyDown(key)
# Track modifier keys
if key.lower() == 'shift':
self.shift_held = True
elif key.lower() in ['ctrl', 'control']:
self.ctrl_held = True
elif key.lower() == 'alt':
self.alt_held = True
def release_key(self, key: str) -> None:
"""Stop holding a key.
Args:
key: Key to release
"""
pyautogui.keyUp(key)
# Track modifier keys
if key.lower() == 'shift':
self.shift_held = False
elif key.lower() in ['ctrl', 'control']:
self.ctrl_held = False
elif key.lower() == 'alt':
self.alt_held = False
def release_all_keys(self) -> None:
"""Release all held modifier keys."""
if self.shift_held:
self.release_key('shift')
if self.ctrl_held:
self.release_key('ctrl')
if self.alt_held:
self.release_key('alt')
def type_number_sequence(self, numbers: Union[str, int],
use_numpad: bool = False) -> None:
"""Type a sequence of numbers.
Args:
numbers: Numbers to type
use_numpad: Whether to use numpad keys
"""
number_str = str(numbers)
for digit in number_str:
if digit.isdigit():
if use_numpad:
key = f'num{digit}'
else:
key = digit
self.type_key(key)
time.sleep(self.calculate_key_delay(digit))
def simulate_pause(self, pause_type: str = 'thinking') -> None:
"""Simulate natural pauses in typing.
Args:
pause_type: Type of pause ('thinking', 'reading', 'short')
"""
if pause_type == 'thinking':
duration = random.uniform(0.5, 2.0)
elif pause_type == 'reading':
duration = random.uniform(0.2, 0.8)
else: # short
duration = random.uniform(0.1, 0.3)
logger.debug(f"Simulating {pause_type} pause for {duration:.2f}s")
time.sleep(duration)
def generate_key_sequence(self, text: str) -> KeySequence:
"""Generate a key sequence with timing for given text.
Args:
text: Text to generate sequence for
Returns:
KeySequence with keys and delays
"""
keys = list(text)
delays = []
total_duration = 0.0
previous_key = None
for key in keys:
delay = self.calculate_key_delay(key, previous_key)
delays.append(delay)
total_duration += delay
previous_key = key
return KeySequence(keys, delays, total_duration)
def set_typing_speed(self, wpm: int) -> None:
"""Set typing speed.
Args:
wpm: Words per minute
"""
self.profile.wpm = max(10, min(200, wpm))
chars_per_minute = self.profile.wpm * 5
self.profile.base_char_delay = 60.0 / chars_per_minute
logger.info(f"Typing speed set to {self.profile.wpm} WPM")
def set_accuracy(self, accuracy: float) -> None:
"""Set typing accuracy.
Args:
accuracy: Accuracy from 0.0 to 1.0
"""
self.profile.accuracy = max(0.0, min(1.0, accuracy))
logger.info(f"Typing accuracy set to {self.profile.accuracy * 100:.1f}%")

View file

@ -1,345 +0,0 @@
"""Human-like mouse movement and clicking with Bézier curves.
Provides realistic mouse movement patterns using Bézier curves with
randomized control points and natural acceleration/deceleration.
"""
from typing import Tuple, List, Optional, Callable
from dataclasses import dataclass
import time
import math
import random
import logging
import pyautogui
import numpy as np
logger = logging.getLogger(__name__)
# Disable pyautogui failsafe for production use
pyautogui.FAILSAFE = False
@dataclass
class MousePath:
"""Represents a mouse movement path with timing."""
points: List[Tuple[int, int]]
delays: List[float]
total_duration: float
class BezierCurve:
"""Bézier curve generation for natural mouse movement."""
@staticmethod
def cubic_bezier(t: float, p0: Tuple[float, float], p1: Tuple[float, float],
p2: Tuple[float, float], p3: Tuple[float, float]) -> Tuple[float, float]:
"""Calculate point on cubic Bézier curve at parameter t.
Args:
t: Parameter from 0 to 1
p0: Start point
p1: First control point
p2: Second control point
p3: End point
Returns:
(x, y) point on curve
"""
x = (1-t)**3 * p0[0] + 3*(1-t)**2*t * p1[0] + 3*(1-t)*t**2 * p2[0] + t**3 * p3[0]
y = (1-t)**3 * p0[1] + 3*(1-t)**2*t * p1[1] + 3*(1-t)*t**2 * p2[1] + t**3 * p3[1]
return (x, y)
@staticmethod
def generate_control_points(start: Tuple[int, int], end: Tuple[int, int],
randomness: float = 0.3) -> Tuple[Tuple[float, float], Tuple[float, float]]:
"""Generate random control points for natural curve.
Args:
start: Starting position
end: Ending position
randomness: Amount of randomness (0.0 to 1.0)
Returns:
Tuple of two control points
"""
dx = end[0] - start[0]
dy = end[1] - start[1]
distance = math.sqrt(dx*dx + dy*dy)
# Control point offset based on distance and randomness
offset_magnitude = distance * randomness * random.uniform(0.2, 0.8)
# Random angles for control points
angle1 = random.uniform(-math.pi, math.pi)
angle2 = random.uniform(-math.pi, math.pi)
# First control point (closer to start)
cp1_x = start[0] + dx * 0.25 + math.cos(angle1) * offset_magnitude
cp1_y = start[1] + dy * 0.25 + math.sin(angle1) * offset_magnitude
# Second control point (closer to end)
cp2_x = start[0] + dx * 0.75 + math.cos(angle2) * offset_magnitude
cp2_y = start[1] + dy * 0.75 + math.sin(angle2) * offset_magnitude
return ((cp1_x, cp1_y), (cp2_x, cp2_y))
class MouseController:
"""Controller for human-like mouse interactions."""
def __init__(self):
"""Initialize mouse controller."""
self.current_pos = pyautogui.position()
self.movement_speed = 1.0 # Multiplier for movement speed
self.click_variance = 3 # Pixel variance for click positions
# Movement timing parameters
self.min_duration = 0.1 # Minimum movement time
self.max_duration = 1.5 # Maximum movement time
self.base_speed = 1000 # Base pixels per second
def get_current_position(self) -> Tuple[int, int]:
"""Get current mouse position.
Returns:
(x, y) tuple of current position
"""
self.current_pos = pyautogui.position()
return self.current_pos
def calculate_movement_duration(self, start: Tuple[int, int],
end: Tuple[int, int]) -> float:
"""Calculate realistic movement duration based on distance.
Args:
start: Starting position
end: Ending position
Returns:
Movement duration in seconds
"""
dx = end[0] - start[0]
dy = end[1] - start[1]
distance = math.sqrt(dx*dx + dy*dy)
# Fitts' Law inspired calculation
# Time increases logarithmically with distance
base_time = distance / (self.base_speed * self.movement_speed)
fitts_factor = math.log2(1 + distance / 10) / 10
duration = base_time + fitts_factor
# Add some randomness
duration *= random.uniform(0.8, 1.2)
# Clamp to reasonable bounds
return max(self.min_duration, min(self.max_duration, duration))
def generate_movement_path(self, start: Tuple[int, int], end: Tuple[int, int],
duration: Optional[float] = None,
steps: Optional[int] = None) -> MousePath:
"""Generate Bézier curve path for mouse movement.
Args:
start: Starting position
end: Ending position
duration: Movement duration, or None to calculate
steps: Number of steps, or None to calculate
Returns:
MousePath with points and timing
"""
if duration is None:
duration = self.calculate_movement_duration(start, end)
if steps is None:
# Calculate steps based on distance and duration
distance = math.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2)
steps = max(10, int(distance / 10)) # Roughly 10 pixels per step
# Generate control points
cp1, cp2 = BezierCurve.generate_control_points(start, end)
# Generate path points
points = []
delays = []
for i in range(steps + 1):
t = i / steps
# Use ease-in-out curve for timing
timing_t = self._ease_in_out(t)
# Calculate position on Bézier curve
x, y = BezierCurve.cubic_bezier(timing_t, start, cp1, cp2, end)
points.append((int(x), int(y)))
# Calculate delay for this step
if i < steps:
delay = duration / steps
# Add small random variation
delay *= random.uniform(0.8, 1.2)
delays.append(delay)
return MousePath(points, delays, duration)
def move_to(self, target: Tuple[int, int], duration: Optional[float] = None) -> None:
"""Move mouse to target position using Bézier curve.
Args:
target: Target (x, y) position
duration: Movement duration, or None to calculate
"""
start = self.get_current_position()
path = self.generate_movement_path(start, target, duration)
logger.debug(f"Moving mouse from {start} to {target} in {path.total_duration:.2f}s")
for i, point in enumerate(path.points[1:], 1):
pyautogui.moveTo(point[0], point[1], duration=0)
if i <= len(path.delays):
time.sleep(path.delays[i-1])
self.current_pos = target
def click(self, position: Optional[Tuple[int, int]] = None,
button: str = 'left', move_first: bool = True) -> None:
"""Click at specified position with human-like variation.
Args:
position: Click position, or None for current position
button: Mouse button ('left', 'right', 'middle')
move_first: Whether to move to position first
"""
if position is None:
position = self.get_current_position()
else:
# Add small random offset for more human-like clicking
offset_x = random.randint(-self.click_variance, self.click_variance)
offset_y = random.randint(-self.click_variance, self.click_variance)
position = (position[0] + offset_x, position[1] + offset_y)
if move_first and position != self.get_current_position():
self.move_to(position)
# Random pre-click delay
time.sleep(random.uniform(0.01, 0.05))
logger.debug(f"Clicking {button} button at {position}")
pyautogui.click(position[0], position[1], button=button)
# Random post-click delay
time.sleep(random.uniform(0.01, 0.08))
def double_click(self, position: Optional[Tuple[int, int]] = None,
move_first: bool = True) -> None:
"""Double-click at specified position.
Args:
position: Click position, or None for current position
move_first: Whether to move to position first
"""
if position is None:
position = self.get_current_position()
if move_first and position != self.get_current_position():
self.move_to(position)
# Random delay before double-click
time.sleep(random.uniform(0.01, 0.05))
logger.debug(f"Double-clicking at {position}")
pyautogui.doubleClick(position[0], position[1])
# Random delay after double-click
time.sleep(random.uniform(0.05, 0.1))
def drag(self, start: Tuple[int, int], end: Tuple[int, int],
button: str = 'left', duration: Optional[float] = None) -> None:
"""Drag from start to end position.
Args:
start: Starting position
end: Ending position
button: Mouse button to drag with
duration: Drag duration, or None to calculate
"""
# Move to start position
self.move_to(start)
# Mouse down
time.sleep(random.uniform(0.01, 0.03))
pyautogui.mouseDown(start[0], start[1], button=button)
# Wait briefly before starting drag
time.sleep(random.uniform(0.05, 0.1))
# Generate drag path
path = self.generate_movement_path(start, end, duration)
logger.debug(f"Dragging from {start} to {end}")
# Execute drag movement
for i, point in enumerate(path.points[1:], 1):
pyautogui.moveTo(point[0], point[1], duration=0)
if i <= len(path.delays):
time.sleep(path.delays[i-1])
# Mouse up
time.sleep(random.uniform(0.01, 0.03))
pyautogui.mouseUp(end[0], end[1], button=button)
self.current_pos = end
def scroll(self, clicks: int, position: Optional[Tuple[int, int]] = None) -> None:
"""Scroll at specified position.
Args:
clicks: Number of scroll clicks (positive = up, negative = down)
position: Scroll position, or None for current position
"""
if position is not None and position != self.get_current_position():
self.move_to(position)
# Random delay before scrolling
time.sleep(random.uniform(0.05, 0.15))
# Scroll with small delays between clicks for more human-like behavior
for i in range(abs(clicks)):
scroll_direction = 1 if clicks > 0 else -1
pyautogui.scroll(scroll_direction)
if i < abs(clicks) - 1: # Don't delay after last scroll
time.sleep(random.uniform(0.02, 0.08))
def _ease_in_out(self, t: float) -> float:
"""Ease-in-out function for smooth acceleration/deceleration.
Args:
t: Input parameter (0 to 1)
Returns:
Eased parameter (0 to 1)
"""
return t * t * (3.0 - 2.0 * t)
def set_movement_speed(self, speed: float) -> None:
"""Set movement speed multiplier.
Args:
speed: Speed multiplier (1.0 = normal, 2.0 = double speed, etc.)
"""
self.movement_speed = max(0.1, min(5.0, speed))
logger.info(f"Mouse movement speed set to {self.movement_speed}x")
def set_click_variance(self, variance: int) -> None:
"""Set click position variance in pixels.
Args:
variance: Maximum pixel offset for clicks
"""
self.click_variance = max(0, min(10, variance))
logger.info(f"Click variance set to {self.click_variance} pixels")

View file

@ -1,46 +0,0 @@
"""Character movement control for isometric games.
Handles click-to-move navigation with human-like patterns.
"""
from typing import Tuple, Optional
import logging
import time
import numpy as np
from engine.input.mouse import MouseController
from engine.input.humanize import Humanizer
from engine.navigation.pathfinder import Waypoint, WaypointGraph
logger = logging.getLogger(__name__)
class MovementController:
"""Controls character movement via click-to-move."""
def __init__(self, mouse: MouseController, humanizer: Humanizer):
self.mouse = mouse
self.humanizer = humanizer
self.waypoints = WaypointGraph()
def click_to_move(self, x: int, y: int) -> None:
"""Click a screen position to move there."""
jx, jy = self.humanizer.jitter_position(x, y)
self.mouse.move_to(jx, jy)
self.humanizer.wait()
self.mouse.click()
def navigate_waypoints(self, start: str, goal: str) -> bool:
"""Navigate between named waypoints."""
path = self.waypoints.find_path(start, goal)
if not path:
logger.warning(f"No path from {start} to {goal}")
return False
for waypoint in path[1:]: # Skip start
self.click_to_move(waypoint.screen_x, waypoint.screen_y)
# Wait for movement (game-specific timing)
time.sleep(self.humanizer.reaction_delay() + 0.5)
return True

View file

@ -1,78 +0,0 @@
"""Pathfinding for isometric game navigation.
Implements A* and click-to-move navigation for isometric games
where the bot needs to move between known locations.
"""
from typing import List, Tuple, Optional, Dict
from dataclasses import dataclass
import heapq
import math
import logging
logger = logging.getLogger(__name__)
@dataclass
class Waypoint:
"""A named location in the game world."""
name: str
screen_x: int
screen_y: int
metadata: Dict = None
class WaypointGraph:
"""Graph of connected waypoints for navigation."""
def __init__(self):
self._waypoints: Dict[str, Waypoint] = {}
self._edges: Dict[str, List[str]] = {}
def add_waypoint(self, waypoint: Waypoint) -> None:
self._waypoints[waypoint.name] = waypoint
self._edges.setdefault(waypoint.name, [])
def connect(self, name_a: str, name_b: str, bidirectional: bool = True) -> None:
self._edges.setdefault(name_a, []).append(name_b)
if bidirectional:
self._edges.setdefault(name_b, []).append(name_a)
def find_path(self, start: str, goal: str) -> Optional[List[Waypoint]]:
"""A* pathfinding between waypoints."""
if start not in self._waypoints or goal not in self._waypoints:
return None
goal_wp = self._waypoints[goal]
def heuristic(name: str) -> float:
wp = self._waypoints[name]
return math.hypot(wp.screen_x - goal_wp.screen_x, wp.screen_y - goal_wp.screen_y)
open_set = [(heuristic(start), 0, start)]
came_from: Dict[str, str] = {}
g_score: Dict[str, float] = {start: 0}
while open_set:
_, cost, current = heapq.heappop(open_set)
if current == goal:
path = []
while current in came_from:
path.append(self._waypoints[current])
current = came_from[current]
path.append(self._waypoints[start])
return list(reversed(path))
for neighbor in self._edges.get(current, []):
n_wp = self._waypoints[neighbor]
c_wp = self._waypoints[current]
edge_cost = math.hypot(n_wp.screen_x - c_wp.screen_x, n_wp.screen_y - c_wp.screen_y)
tentative_g = g_score[current] + edge_cost
if tentative_g < g_score.get(neighbor, float('inf')):
came_from[neighbor] = current
g_score[neighbor] = tentative_g
heapq.heappush(open_set, (tentative_g + heuristic(neighbor), tentative_g, neighbor))
return None

View file

@ -1,73 +0,0 @@
"""Behavioral pattern randomization for anti-detection.
Varies bot behavior to avoid detectable patterns like identical
farming routes, consistent timing, or perfect execution.
"""
import random
import logging
from typing import List, Tuple, Callable, Any
logger = logging.getLogger(__name__)
class RouteRandomizer:
"""Randomizes farming routes and action sequences."""
def __init__(self, variation_factor: float = 0.15):
self.variation_factor = variation_factor
def shuffle_optional_steps(
self, steps: List[Any], required_indices: List[int] = None,
) -> List[Any]:
"""Shuffle non-required steps while keeping required ones in order."""
required_indices = set(required_indices or [])
required = [(i, s) for i, s in enumerate(steps) if i in required_indices]
optional = [s for i, s in enumerate(steps) if i not in required_indices]
random.shuffle(optional)
result = []
opt_iter = iter(optional)
req_iter = iter(required)
next_req = next(req_iter, None)
for i in range(len(steps)):
if next_req and next_req[0] == i:
result.append(next_req[1])
next_req = next(req_iter, None)
else:
result.append(next(opt_iter))
return result
def vary_route(
self, waypoints: List[Tuple[int, int]],
) -> List[Tuple[int, int]]:
"""Add slight variations to a route's waypoints."""
varied = []
for x, y in waypoints:
offset_x = int(x * self.variation_factor * random.uniform(-1, 1))
offset_y = int(y * self.variation_factor * random.uniform(-1, 1))
varied.append((x + offset_x, y + offset_y))
return varied
def should_skip_optional(self, skip_chance: float = 0.1) -> bool:
"""Randomly decide to skip an optional action."""
return random.random() < skip_chance
class ActionVariator:
"""Varies how actions are performed."""
@staticmethod
def vary_count(target: int, variance: int = 1) -> int:
"""Vary a repeat count (e.g., click 2-4 times instead of always 3)."""
return max(1, target + random.randint(-variance, variance))
@staticmethod
def random_order(actions: List[Callable]) -> List[Callable]:
"""Randomize the order of independent actions."""
shuffled = actions.copy()
random.shuffle(shuffled)
return shuffled

View file

@ -1,68 +0,0 @@
"""Anti-detection timing and break scheduling.
Manages play sessions with realistic timing patterns to avoid
behavioral detection systems.
"""
import random
import time
import logging
from dataclasses import dataclass
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass
class SessionSchedule:
"""Defines a play session schedule."""
min_session_hours: float = 1.0
max_session_hours: float = 4.0
min_break_minutes: float = 10.0
max_break_minutes: float = 45.0
max_daily_hours: float = 12.0
class SessionTimer:
"""Manages bot session timing to mimic human play patterns."""
def __init__(self, schedule: Optional[SessionSchedule] = None):
self.schedule = schedule or SessionSchedule()
self._session_start = time.time()
self._daily_playtime = 0.0
self._day_start = time.time()
self._target_duration = self._roll_session_duration()
def _roll_session_duration(self) -> float:
"""Generate random session duration in seconds."""
hours = random.uniform(
self.schedule.min_session_hours,
self.schedule.max_session_hours,
)
return hours * 3600
def session_elapsed(self) -> float:
"""Seconds elapsed in current session."""
return time.time() - self._session_start
def should_stop_session(self) -> bool:
"""Check if current session should end."""
if self.session_elapsed() >= self._target_duration:
return True
if self._daily_playtime + self.session_elapsed() >= self.schedule.max_daily_hours * 3600:
return True
return False
def get_break_duration(self) -> float:
"""Get randomized break duration in seconds."""
return random.uniform(
self.schedule.min_break_minutes * 60,
self.schedule.max_break_minutes * 60,
)
def start_new_session(self) -> None:
"""Start a new play session after break."""
self._daily_playtime += self.session_elapsed()
self._session_start = time.time()
self._target_duration = self._roll_session_duration()
logger.info(f"New session: {self._target_duration/3600:.1f}h target")

View file

@ -1,23 +0,0 @@
"""Screen reading components for visual game state detection.
This module provides tools for capturing, analyzing, and extracting information
from game screenshots without requiring memory access or game modification.
Components:
- capture: Screenshot capture using various backends
- ocr: Optical Character Recognition for text extraction
- template: Template matching for UI element detection
"""
from .capture import ScreenCapture, ScreenRegion
from .ocr import OCREngine, TextDetector
from .template import TemplateManager, TemplateMatcher
__all__ = [
"ScreenCapture",
"ScreenRegion",
"OCREngine",
"TextDetector",
"TemplateManager",
"TemplateMatcher",
]

View file

@ -1,220 +0,0 @@
"""Screen capture utilities for taking game screenshots.
Provides efficient screenshot capture using multiple backends (mss, PIL)
with support for specific regions and window targeting.
"""
from typing import Tuple, Optional, Dict, Any
from dataclasses import dataclass
import time
import logging
import numpy as np
from PIL import Image, ImageGrab
import mss
import cv2
logger = logging.getLogger(__name__)
@dataclass
class ScreenRegion:
"""Defines a rectangular region of the screen to capture."""
x: int
y: int
width: int
height: int
@property
def bounds(self) -> Tuple[int, int, int, int]:
"""Return region as (left, top, right, bottom) tuple."""
return (self.x, self.y, self.x + self.width, self.y + self.height)
@property
def mss_bounds(self) -> Dict[str, int]:
"""Return region in MSS format."""
return {
"top": self.y,
"left": self.x,
"width": self.width,
"height": self.height,
}
class ScreenCapture:
"""High-performance screen capture with multiple backends."""
def __init__(self, backend: str = "mss", monitor: int = 1):
"""Initialize screen capture.
Args:
backend: Capture backend ("mss" or "pil")
monitor: Monitor number to capture from (1-indexed)
"""
self.backend = backend
self.monitor = monitor
self._mss_instance: Optional[mss.mss] = None
self._monitor_info: Optional[Dict[str, int]] = None
if backend == "mss":
self._initialize_mss()
def _initialize_mss(self) -> None:
"""Initialize MSS backend."""
try:
self._mss_instance = mss.mss()
monitors = self._mss_instance.monitors
if self.monitor >= len(monitors):
logger.warning(f"Monitor {self.monitor} not found, using primary")
self.monitor = 1
self._monitor_info = monitors[self.monitor]
logger.info(f"Initialized MSS capture for monitor {self.monitor}: "
f"{self._monitor_info['width']}x{self._monitor_info['height']}")
except Exception as e:
logger.error(f"Failed to initialize MSS: {e}")
self.backend = "pil"
def capture_screen(self, region: Optional[ScreenRegion] = None) -> np.ndarray:
"""Capture screenshot of screen or region.
Args:
region: Specific region to capture, or None for full screen
Returns:
Screenshot as numpy array in BGR format (for OpenCV compatibility)
"""
try:
if self.backend == "mss":
return self._capture_mss(region)
else:
return self._capture_pil(region)
except Exception as e:
logger.error(f"Screen capture failed: {e}")
# Fallback to empty image
return np.zeros((100, 100, 3), dtype=np.uint8)
def _capture_mss(self, region: Optional[ScreenRegion]) -> np.ndarray:
"""Capture using MSS backend."""
if not self._mss_instance:
raise RuntimeError("MSS not initialized")
if region:
monitor = region.mss_bounds
else:
monitor = self._monitor_info or self._mss_instance.monitors[self.monitor]
# MSS returns BGRA format
screenshot = self._mss_instance.grab(monitor)
img_array = np.frombuffer(screenshot.rgb, dtype=np.uint8)
img_array = img_array.reshape((screenshot.height, screenshot.width, 3))
# Convert RGB to BGR for OpenCV
return cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
def _capture_pil(self, region: Optional[ScreenRegion]) -> np.ndarray:
"""Capture using PIL backend."""
if region:
bbox = region.bounds
else:
bbox = None
# PIL returns RGB format
screenshot = ImageGrab.grab(bbox=bbox)
img_array = np.array(screenshot)
# Convert RGB to BGR for OpenCV
return cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
def save_screenshot(self, filename: str, region: Optional[ScreenRegion] = None) -> bool:
"""Save screenshot to file.
Args:
filename: Output filename
region: Region to capture, or None for full screen
Returns:
True if successful, False otherwise
"""
try:
img = self.capture_screen(region)
return cv2.imwrite(filename, img)
except Exception as e:
logger.error(f"Failed to save screenshot: {e}")
return False
def get_screen_size(self) -> Tuple[int, int]:
"""Get screen dimensions.
Returns:
(width, height) tuple
"""
if self.backend == "mss" and self._monitor_info:
return (self._monitor_info["width"], self._monitor_info["height"])
else:
# Use PIL as fallback
screenshot = ImageGrab.grab()
return screenshot.size
def find_window(self, window_title: str) -> Optional[ScreenRegion]:
"""Find window by title and return its region.
Args:
window_title: Partial or full window title to search for
Returns:
ScreenRegion if window found, None otherwise
Note:
This is a placeholder - actual implementation would use
platform-specific window enumeration (e.g., Windows API, X11)
"""
# TODO: Implement window finding
logger.warning("Window finding not implemented yet")
return None
def benchmark_capture(self, iterations: int = 100) -> Dict[str, float]:
"""Benchmark capture performance.
Args:
iterations: Number of captures to perform
Returns:
Performance statistics
"""
logger.info(f"Benchmarking {self.backend} backend ({iterations} iterations)")
start_time = time.perf_counter()
for _ in range(iterations):
self.capture_screen()
end_time = time.perf_counter()
total_time = end_time - start_time
avg_time = total_time / iterations
fps = iterations / total_time
stats = {
"backend": self.backend,
"iterations": iterations,
"total_time": total_time,
"avg_time_ms": avg_time * 1000,
"fps": fps,
}
logger.info(f"Benchmark results: {avg_time*1000:.2f}ms avg, {fps:.1f} FPS")
return stats
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
if self._mss_instance:
self._mss_instance.close()

View file

@ -1,346 +0,0 @@
"""OCR (Optical Character Recognition) for extracting text from screenshots.
Provides text detection and extraction capabilities using pytesseract
with preprocessing for better accuracy in game environments.
"""
from typing import List, Dict, Optional, Tuple, NamedTuple
import logging
import re
import cv2
import numpy as np
import pytesseract
from PIL import Image
logger = logging.getLogger(__name__)
class TextMatch(NamedTuple):
"""Represents detected text with position and confidence."""
text: str
confidence: float
bbox: Tuple[int, int, int, int] # (x, y, width, height)
class OCRConfig:
"""Configuration for OCR processing."""
def __init__(self):
# Tesseract configuration
self.tesseract_config = "--oem 3 --psm 6" # Default config
self.language = "eng"
self.min_confidence = 30.0
# Image preprocessing
self.preprocess = True
self.scale_factor = 2.0
self.denoise = True
self.contrast_enhance = True
# Text filtering
self.min_text_length = 1
self.filter_patterns = [
r'^[a-zA-Z0-9\s\-_:.,/]+$', # Alphanumeric with common punctuation
]
class OCREngine:
"""OCR engine for text extraction from game screenshots."""
def __init__(self, config: Optional[OCRConfig] = None):
"""Initialize OCR engine.
Args:
config: OCR configuration, or None for defaults
"""
self.config = config or OCRConfig()
self._verify_tesseract()
def _verify_tesseract(self) -> None:
"""Verify tesseract installation."""
try:
pytesseract.get_tesseract_version()
logger.info("Tesseract initialized successfully")
except Exception as e:
logger.error(f"Tesseract not found or not working: {e}")
raise RuntimeError("Tesseract OCR is required but not available")
def extract_text(self, image: np.ndarray, region: Optional[Tuple[int, int, int, int]] = None) -> str:
"""Extract all text from image.
Args:
image: Input image as numpy array
region: Optional (x, y, width, height) region to process
Returns:
Extracted text as string
"""
processed_img = self._preprocess_image(image, region)
try:
text = pytesseract.image_to_string(
processed_img,
lang=self.config.language,
config=self.config.tesseract_config
)
return self._clean_text(text)
except Exception as e:
logger.error(f"OCR extraction failed: {e}")
return ""
def find_text(self, image: np.ndarray, search_text: str,
case_sensitive: bool = False) -> List[TextMatch]:
"""Find specific text in image with positions.
Args:
image: Input image as numpy array
search_text: Text to search for
case_sensitive: Whether search should be case sensitive
Returns:
List of TextMatch objects for found text
"""
processed_img = self._preprocess_image(image)
try:
# Get detailed OCR data
data = pytesseract.image_to_data(
processed_img,
lang=self.config.language,
config=self.config.tesseract_config,
output_type=pytesseract.Output.DICT
)
matches = []
search_lower = search_text.lower() if not case_sensitive else search_text
for i in range(len(data['text'])):
text = data['text'][i].strip()
confidence = float(data['conf'][i])
if confidence < self.config.min_confidence:
continue
text_to_match = text.lower() if not case_sensitive else text
if search_lower in text_to_match:
bbox = (
data['left'][i],
data['top'][i],
data['width'][i],
data['height'][i]
)
matches.append(TextMatch(text, confidence, bbox))
return matches
except Exception as e:
logger.error(f"Text search failed: {e}")
return []
def get_text_regions(self, image: np.ndarray) -> List[TextMatch]:
"""Get all text regions with positions and confidence.
Args:
image: Input image as numpy array
Returns:
List of TextMatch objects for all detected text
"""
processed_img = self._preprocess_image(image)
try:
data = pytesseract.image_to_data(
processed_img,
lang=self.config.language,
config=self.config.tesseract_config,
output_type=pytesseract.Output.DICT
)
text_regions = []
for i in range(len(data['text'])):
text = data['text'][i].strip()
confidence = float(data['conf'][i])
if (confidence < self.config.min_confidence or
len(text) < self.config.min_text_length):
continue
if not self._passes_text_filters(text):
continue
bbox = (
data['left'][i],
data['top'][i],
data['width'][i],
data['height'][i]
)
text_regions.append(TextMatch(text, confidence, bbox))
return text_regions
except Exception as e:
logger.error(f"Text region detection failed: {e}")
return []
def _preprocess_image(self, image: np.ndarray,
region: Optional[Tuple[int, int, int, int]] = None) -> Image.Image:
"""Preprocess image for better OCR accuracy.
Args:
image: Input image as numpy array
region: Optional region to extract
Returns:
Preprocessed PIL Image
"""
# Extract region if specified
if region:
x, y, w, h = region
image = image[y:y+h, x:x+w]
if not self.config.preprocess:
return Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Scale up for better OCR
if self.config.scale_factor > 1.0:
height, width = gray.shape
new_width = int(width * self.config.scale_factor)
new_height = int(height * self.config.scale_factor)
gray = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
# Denoise
if self.config.denoise:
gray = cv2.fastNlMeansDenoising(gray)
# Enhance contrast
if self.config.contrast_enhance:
# Use CLAHE (Contrast Limited Adaptive Histogram Equalization)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
gray = clahe.apply(gray)
# Convert back to PIL Image
return Image.fromarray(gray)
def _clean_text(self, text: str) -> str:
"""Clean extracted text.
Args:
text: Raw extracted text
Returns:
Cleaned text
"""
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text.strip())
# Remove common OCR artifacts
text = re.sub(r'[|¦]', 'I', text) # Vertical bars to I
text = re.sub(r'[{}]', '', text) # Remove braces
return text
def _passes_text_filters(self, text: str) -> bool:
"""Check if text passes configured filters.
Args:
text: Text to check
Returns:
True if text passes filters
"""
if not self.config.filter_patterns:
return True
for pattern in self.config.filter_patterns:
if re.match(pattern, text):
return True
return False
class TextDetector:
"""High-level text detection interface."""
def __init__(self, ocr_config: Optional[OCRConfig] = None):
"""Initialize text detector.
Args:
ocr_config: OCR configuration
"""
self.ocr = OCREngine(ocr_config)
self.text_cache: Dict[str, List[TextMatch]] = {}
def contains_text(self, image: np.ndarray, text: str,
case_sensitive: bool = False) -> bool:
"""Check if image contains specific text.
Args:
image: Input image
text: Text to search for
case_sensitive: Case sensitive search
Returns:
True if text found
"""
matches = self.ocr.find_text(image, text, case_sensitive)
return len(matches) > 0
def wait_for_text(self, capture_func, text: str, timeout: float = 10.0,
check_interval: float = 0.5) -> bool:
"""Wait for specific text to appear on screen.
Args:
capture_func: Function that returns screenshot
text: Text to wait for
timeout: Maximum wait time in seconds
check_interval: Time between checks in seconds
Returns:
True if text appeared, False if timeout
"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
image = capture_func()
if self.contains_text(image, text):
return True
time.sleep(check_interval)
return False
def get_ui_text(self, image: np.ndarray) -> Dict[str, str]:
"""Extract common UI text elements.
Args:
image: Input image
Returns:
Dictionary mapping UI elements to text
"""
# This is a placeholder for game-specific UI text extraction
# In practice, this would define regions for health, mana, inventory, etc.
text_regions = self.ocr.get_text_regions(image)
ui_text = {}
for region in text_regions:
# Categorize text based on position or pattern
if "health" in region.text.lower():
ui_text["health"] = region.text
elif "mana" in region.text.lower():
ui_text["mana"] = region.text
# Add more UI element detection
return ui_text

View file

@ -1,403 +0,0 @@
"""Template matching for UI element detection in game screenshots.
Provides efficient template matching using OpenCV with support for
multiple templates, confidence thresholds, and template management.
"""
from typing import List, Dict, Optional, Tuple, NamedTuple
from pathlib import Path
import logging
from dataclasses import dataclass
import cv2
import numpy as np
logger = logging.getLogger(__name__)
class TemplateMatch(NamedTuple):
"""Represents a template match with position and confidence."""
template_name: str
confidence: float
center: Tuple[int, int] # (x, y) center position
bbox: Tuple[int, int, int, int] # (x, y, width, height)
@dataclass
class TemplateInfo:
"""Information about a loaded template."""
name: str
image: np.ndarray
width: int
height: int
path: Optional[str] = None
class TemplateMatcher:
"""Core template matching functionality."""
def __init__(self, method: int = cv2.TM_CCOEFF_NORMED,
threshold: float = 0.8):
"""Initialize template matcher.
Args:
method: OpenCV template matching method
threshold: Minimum confidence threshold (0.0 to 1.0)
"""
self.method = method
self.threshold = threshold
def match_template(self, image: np.ndarray, template: np.ndarray,
threshold: Optional[float] = None) -> List[TemplateMatch]:
"""Match single template in image.
Args:
image: Source image to search in
template: Template image to find
threshold: Confidence threshold override
Returns:
List of matches found
"""
if threshold is None:
threshold = self.threshold
# Convert to grayscale if needed
if len(image.shape) == 3:
image_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
image_gray = image
if len(template.shape) == 3:
template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
else:
template_gray = template
# Perform template matching
result = cv2.matchTemplate(image_gray, template_gray, self.method)
# Find matches above threshold
locations = np.where(result >= threshold)
matches = []
template_h, template_w = template_gray.shape
for pt in zip(*locations[::-1]): # Switch x and y
x, y = pt
confidence = result[y, x]
center = (x + template_w // 2, y + template_h // 2)
bbox = (x, y, template_w, template_h)
matches.append(TemplateMatch("", confidence, center, bbox))
# Remove overlapping matches (Non-Maximum Suppression)
matches = self._apply_nms(matches, overlap_threshold=0.3)
return matches
def match_multiple_scales(self, image: np.ndarray, template: np.ndarray,
scales: List[float] = None,
threshold: Optional[float] = None) -> List[TemplateMatch]:
"""Match template at multiple scales.
Args:
image: Source image
template: Template image
scales: List of scale factors to try
threshold: Confidence threshold
Returns:
List of matches at all scales
"""
if scales is None:
scales = [0.8, 0.9, 1.0, 1.1, 1.2]
all_matches = []
for scale in scales:
# Scale template
new_width = int(template.shape[1] * scale)
new_height = int(template.shape[0] * scale)
if new_width < 10 or new_height < 10:
continue # Skip very small templates
scaled_template = cv2.resize(template, (new_width, new_height))
# Find matches at this scale
matches = self.match_template(image, scaled_template, threshold)
all_matches.extend(matches)
# Apply NMS across all scales
all_matches = self._apply_nms(all_matches, overlap_threshold=0.5)
return all_matches
def _apply_nms(self, matches: List[TemplateMatch],
overlap_threshold: float = 0.3) -> List[TemplateMatch]:
"""Apply Non-Maximum Suppression to remove overlapping matches.
Args:
matches: List of template matches
overlap_threshold: Maximum allowed overlap ratio
Returns:
Filtered list of matches
"""
if not matches:
return matches
# Sort by confidence (highest first)
matches = sorted(matches, key=lambda x: x.confidence, reverse=True)
filtered_matches = []
for match in matches:
# Check if this match overlaps significantly with any kept match
is_duplicate = False
for kept_match in filtered_matches:
if self._calculate_overlap(match, kept_match) > overlap_threshold:
is_duplicate = True
break
if not is_duplicate:
filtered_matches.append(match)
return filtered_matches
def _calculate_overlap(self, match1: TemplateMatch, match2: TemplateMatch) -> float:
"""Calculate overlap ratio between two matches.
Args:
match1: First match
match2: Second match
Returns:
Overlap ratio (0.0 to 1.0)
"""
x1, y1, w1, h1 = match1.bbox
x2, y2, w2, h2 = match2.bbox
# Calculate intersection
left = max(x1, x2)
right = min(x1 + w1, x2 + w2)
top = max(y1, y2)
bottom = min(y1 + h1, y2 + h2)
if left >= right or top >= bottom:
return 0.0
intersection = (right - left) * (bottom - top)
area1 = w1 * h1
area2 = w2 * h2
union = area1 + area2 - intersection
return intersection / union if union > 0 else 0.0
class TemplateManager:
"""Manages a collection of templates for game UI detection."""
def __init__(self, template_dir: Optional[Path] = None):
"""Initialize template manager.
Args:
template_dir: Directory containing template images
"""
self.template_dir = template_dir
self.templates: Dict[str, TemplateInfo] = {}
self.matcher = TemplateMatcher()
if template_dir and template_dir.exists():
self.load_templates_from_directory(template_dir)
def load_template(self, name: str, image_path: Path) -> bool:
"""Load single template from file.
Args:
name: Template identifier
image_path: Path to template image
Returns:
True if loaded successfully
"""
try:
image = cv2.imread(str(image_path))
if image is None:
logger.error(f"Could not load template image: {image_path}")
return False
height, width = image.shape[:2]
self.templates[name] = TemplateInfo(
name=name,
image=image,
width=width,
height=height,
path=str(image_path)
)
logger.info(f"Loaded template '{name}' ({width}x{height})")
return True
except Exception as e:
logger.error(f"Failed to load template '{name}': {e}")
return False
def load_templates_from_directory(self, directory: Path) -> int:
"""Load all templates from directory.
Args:
directory: Directory containing template images
Returns:
Number of templates loaded
"""
loaded_count = 0
for image_path in directory.glob("*.png"):
template_name = image_path.stem
if self.load_template(template_name, image_path):
loaded_count += 1
logger.info(f"Loaded {loaded_count} templates from {directory}")
return loaded_count
def find_template(self, image: np.ndarray, template_name: str,
threshold: Optional[float] = None) -> List[TemplateMatch]:
"""Find specific template in image.
Args:
image: Source image
template_name: Name of template to find
threshold: Confidence threshold override
Returns:
List of matches found
"""
if template_name not in self.templates:
logger.warning(f"Template '{template_name}' not found")
return []
template_info = self.templates[template_name]
matches = self.matcher.match_template(image, template_info.image, threshold)
# Set template name in matches
named_matches = []
for match in matches:
named_match = TemplateMatch(
template_name=template_name,
confidence=match.confidence,
center=match.center,
bbox=match.bbox
)
named_matches.append(named_match)
return named_matches
def find_any_template(self, image: np.ndarray,
template_names: Optional[List[str]] = None,
threshold: Optional[float] = None) -> List[TemplateMatch]:
"""Find any of the specified templates in image.
Args:
image: Source image
template_names: List of template names to search for, or None for all
threshold: Confidence threshold override
Returns:
List of all matches found
"""
if template_names is None:
template_names = list(self.templates.keys())
all_matches = []
for template_name in template_names:
matches = self.find_template(image, template_name, threshold)
all_matches.extend(matches)
# Sort by confidence
all_matches.sort(key=lambda x: x.confidence, reverse=True)
return all_matches
def wait_for_template(self, capture_func, template_name: str,
timeout: float = 10.0, check_interval: float = 0.5,
threshold: Optional[float] = None) -> Optional[TemplateMatch]:
"""Wait for template to appear on screen.
Args:
capture_func: Function that returns screenshot
template_name: Template to wait for
timeout: Maximum wait time in seconds
check_interval: Time between checks in seconds
threshold: Confidence threshold override
Returns:
First match found, or None if timeout
"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
image = capture_func()
matches = self.find_template(image, template_name, threshold)
if matches:
return matches[0] # Return best match
time.sleep(check_interval)
return None
def get_template_info(self, template_name: str) -> Optional[TemplateInfo]:
"""Get information about loaded template.
Args:
template_name: Name of template
Returns:
TemplateInfo object or None if not found
"""
return self.templates.get(template_name)
def list_templates(self) -> List[str]:
"""Get list of all loaded template names.
Returns:
List of template names
"""
return list(self.templates.keys())
def create_debug_image(self, image: np.ndarray, matches: List[TemplateMatch]) -> np.ndarray:
"""Create debug image showing template matches.
Args:
image: Original image
matches: List of matches to highlight
Returns:
Debug image with matches drawn
"""
debug_img = image.copy()
for match in matches:
x, y, w, h = match.bbox
# Draw bounding box
cv2.rectangle(debug_img, (x, y), (x + w, y + h), (0, 255, 0), 2)
# Draw center point
center_x, center_y = match.center
cv2.circle(debug_img, (center_x, center_y), 5, (255, 0, 0), -1)
# Draw template name and confidence
label = f"{match.template_name}: {match.confidence:.2f}"
cv2.putText(debug_img, label, (x, y - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
return debug_img

View file

@ -1,34 +0,0 @@
"""Event system for inter-component communication."""
from typing import Callable, Any, Dict, List
import logging
logger = logging.getLogger(__name__)
class EventBus:
"""Simple publish/subscribe event system."""
def __init__(self):
self._listeners: Dict[str, List[Callable]] = {}
def on(self, event: str, callback: Callable) -> None:
"""Subscribe to an event."""
self._listeners.setdefault(event, []).append(callback)
def off(self, event: str, callback: Callable) -> None:
"""Unsubscribe from an event."""
if event in self._listeners:
self._listeners[event] = [cb for cb in self._listeners[event] if cb != callback]
def emit(self, event: str, **data: Any) -> None:
"""Emit an event to all subscribers."""
for cb in self._listeners.get(event, []):
try:
cb(**data)
except Exception as e:
logger.error(f"Event handler error for '{event}': {e}")
def clear(self) -> None:
"""Remove all listeners."""
self._listeners.clear()

View file

@ -1,105 +0,0 @@
"""Game state machine management.
Provides a base state manager that game implementations extend
to detect and track game states (menu, in-game, inventory, etc.).
"""
from typing import Optional, Callable, Dict, Any
from enum import Enum, auto
from dataclasses import dataclass
import logging
import time
import numpy as np
logger = logging.getLogger(__name__)
class BaseGameState(Enum):
"""Base states common to most games."""
UNKNOWN = auto()
LOADING = auto()
MAIN_MENU = auto()
CHARACTER_SELECT = auto()
IN_GAME = auto()
INVENTORY = auto()
DEAD = auto()
DISCONNECTED = auto()
@dataclass
class StateTransition:
"""Records a state transition."""
from_state: BaseGameState
to_state: BaseGameState
timestamp: float
metadata: Dict[str, Any] = None
class GameStateManager:
"""Base class for game state detection and management.
Game implementations should subclass this and implement
detect_state() with game-specific screen analysis.
"""
def __init__(self):
self._current_state: BaseGameState = BaseGameState.UNKNOWN
self._previous_state: BaseGameState = BaseGameState.UNKNOWN
self._state_enter_time: float = time.time()
self._history: list[StateTransition] = []
self._callbacks: Dict[BaseGameState, list[Callable]] = {}
@property
def current_state(self) -> BaseGameState:
return self._current_state
@property
def previous_state(self) -> BaseGameState:
return self._previous_state
@property
def time_in_state(self) -> float:
"""Seconds spent in current state."""
return time.time() - self._state_enter_time
def detect_state(self, screen: np.ndarray) -> BaseGameState:
"""Detect current game state from screenshot.
Must be overridden by game implementations.
"""
raise NotImplementedError("Subclasses must implement detect_state()")
def update(self, screen: np.ndarray) -> BaseGameState:
"""Update state from current screen. Triggers callbacks on change."""
new_state = self.detect_state(screen)
if new_state != self._current_state:
transition = StateTransition(
from_state=self._current_state,
to_state=new_state,
timestamp=time.time(),
)
self._history.append(transition)
logger.info(f"State: {self._current_state.name}{new_state.name}")
self._previous_state = self._current_state
self._current_state = new_state
self._state_enter_time = time.time()
# Fire callbacks
for cb in self._callbacks.get(new_state, []):
try:
cb(transition)
except Exception as e:
logger.error(f"State callback error: {e}")
return self._current_state
def on_state(self, state: BaseGameState, callback: Callable) -> None:
"""Register a callback for when entering a state."""
self._callbacks.setdefault(state, []).append(callback)
def is_state(self, state: BaseGameState) -> bool:
return self._current_state == state

View file

@ -1,87 +0,0 @@
"""Color and pixel analysis utilities.
Provides tools for reading health/mana bars, detecting UI states
via color sampling, and pixel-level game state detection.
"""
from typing import Tuple, Optional, List
import logging
import numpy as np
import cv2
logger = logging.getLogger(__name__)
class ColorAnalyzer:
"""Analyze pixel colors and UI bar states."""
@staticmethod
def get_pixel_color(screen: np.ndarray, x: int, y: int) -> Tuple[int, int, int]:
"""Get BGR color at pixel position."""
return tuple(screen[y, x].tolist())
@staticmethod
def get_pixel_hsv(screen: np.ndarray, x: int, y: int) -> Tuple[int, int, int]:
"""Get HSV color at pixel position."""
hsv = cv2.cvtColor(screen[y:y+1, x:x+1], cv2.COLOR_BGR2HSV)
return tuple(hsv[0, 0].tolist())
@staticmethod
def color_matches(
color: Tuple[int, int, int],
target: Tuple[int, int, int],
tolerance: int = 20,
) -> bool:
"""Check if a color matches target within tolerance."""
return all(abs(c - t) <= tolerance for c, t in zip(color, target))
@staticmethod
def read_bar_percentage(
screen: np.ndarray,
bar_region: Tuple[int, int, int, int],
filled_color_hsv: Tuple[Tuple[int, int, int], Tuple[int, int, int]],
) -> float:
"""Read a horizontal bar's fill percentage (health, mana, xp, etc.).
Args:
screen: Screenshot in BGR
bar_region: (x, y, width, height) of the bar
filled_color_hsv: (lower_hsv, upper_hsv) range of the filled portion
Returns:
Fill percentage 0.0 to 1.0
"""
x, y, w, h = bar_region
bar = screen[y:y+h, x:x+w]
hsv = cv2.cvtColor(bar, cv2.COLOR_BGR2HSV)
lower, upper = filled_color_hsv
mask = cv2.inRange(hsv, np.array(lower), np.array(upper))
# Scan columns left to right to find the fill boundary
col_fill = np.mean(mask, axis=0) / 255.0
# Find the rightmost column that's mostly filled
threshold = 0.3
filled_cols = np.where(col_fill > threshold)[0]
if len(filled_cols) == 0:
return 0.0
return (filled_cols[-1] + 1) / w
@staticmethod
def sample_region_dominant_color(
screen: np.ndarray,
region: Tuple[int, int, int, int],
) -> Tuple[int, int, int]:
"""Get the dominant BGR color in a region."""
x, y, w, h = region
roi = screen[y:y+h, x:x+w]
pixels = roi.reshape(-1, 3).astype(np.float32)
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0)
_, labels, centers = cv2.kmeans(pixels, 1, None, criteria, 3, cv2.KMEANS_RANDOM_CENTERS)
return tuple(centers[0].astype(int).tolist())

View file

@ -1,140 +0,0 @@
"""Object and UI element detection using computer vision.
Provides high-level detection for game elements using template matching,
color filtering, and contour analysis.
"""
from typing import List, Optional, Tuple
from dataclasses import dataclass
import logging
import numpy as np
import cv2
logger = logging.getLogger(__name__)
@dataclass
class Detection:
"""Represents a detected object/element on screen."""
x: int
y: int
width: int
height: int
confidence: float
label: str = ""
@property
def center(self) -> Tuple[int, int]:
return (self.x + self.width // 2, self.y + self.height // 2)
@property
def bounds(self) -> Tuple[int, int, int, int]:
return (self.x, self.y, self.x + self.width, self.y + self.height)
class ElementDetector:
"""Detects game UI elements and objects via computer vision."""
def __init__(self, confidence_threshold: float = 0.8):
self.confidence_threshold = confidence_threshold
self._templates: dict[str, np.ndarray] = {}
def load_template(self, name: str, image_path: str) -> None:
"""Load a template image for matching."""
template = cv2.imread(image_path, cv2.IMREAD_COLOR)
if template is None:
raise FileNotFoundError(f"Template not found: {image_path}")
self._templates[name] = template
logger.debug(f"Loaded template '{name}': {template.shape}")
def find_template(
self, screen: np.ndarray, template_name: str,
method: int = cv2.TM_CCOEFF_NORMED,
) -> Optional[Detection]:
"""Find best match of a template in the screen image."""
if template_name not in self._templates:
logger.error(f"Unknown template: {template_name}")
return None
template = self._templates[template_name]
result = cv2.matchTemplate(screen, template, method)
_, max_val, _, max_loc = cv2.minMaxLoc(result)
if max_val >= self.confidence_threshold:
h, w = template.shape[:2]
return Detection(
x=max_loc[0], y=max_loc[1],
width=w, height=h,
confidence=max_val, label=template_name,
)
return None
def find_all_templates(
self, screen: np.ndarray, template_name: str,
method: int = cv2.TM_CCOEFF_NORMED,
) -> List[Detection]:
"""Find all matches of a template above confidence threshold."""
if template_name not in self._templates:
return []
template = self._templates[template_name]
h, w = template.shape[:2]
result = cv2.matchTemplate(screen, template, method)
locations = np.where(result >= self.confidence_threshold)
detections = []
for pt in zip(*locations[::-1]):
detections.append(Detection(
x=pt[0], y=pt[1], width=w, height=h,
confidence=result[pt[1], pt[0]], label=template_name,
))
# Non-maximum suppression (simple distance-based)
return self._nms(detections, distance_threshold=min(w, h) // 2)
def find_by_color(
self, screen: np.ndarray, lower_hsv: Tuple[int, int, int],
upper_hsv: Tuple[int, int, int], min_area: int = 100,
label: str = "",
) -> List[Detection]:
"""Find objects by HSV color range."""
hsv = cv2.cvtColor(screen, cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv, np.array(lower_hsv), np.array(upper_hsv))
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
detections = []
for contour in contours:
area = cv2.contourArea(contour)
if area >= min_area:
x, y, w, h = cv2.boundingRect(contour)
detections.append(Detection(
x=x, y=y, width=w, height=h,
confidence=area / (w * h), label=label,
))
return detections
def _nms(self, detections: List[Detection], distance_threshold: int) -> List[Detection]:
"""Simple non-maximum suppression by distance."""
if not detections:
return []
detections.sort(key=lambda d: d.confidence, reverse=True)
kept = []
for det in detections:
too_close = False
for k in kept:
dx = abs(det.center[0] - k.center[0])
dy = abs(det.center[1] - k.center[1])
if dx < distance_threshold and dy < distance_threshold:
too_close = True
break
if not too_close:
kept.append(det)
return kept

View file

View file

View file

@ -1,83 +0,0 @@
"""Diablo II: Resurrected configuration.
Game-specific settings for screen regions, colors, and timings.
"""
from dataclasses import dataclass, field
from typing import Dict, Tuple
@dataclass
class D2RScreenRegions:
"""Screen regions for UI elements at 1920x1080."""
health_orb: Tuple[int, int, int, int] = (28, 545, 170, 170)
mana_orb: Tuple[int, int, int, int] = (1722, 545, 170, 170)
xp_bar: Tuple[int, int, int, int] = (0, 1058, 1920, 22)
belt: Tuple[int, int, int, int] = (838, 1010, 244, 48)
minimap: Tuple[int, int, int, int] = (1600, 0, 320, 320)
inventory: Tuple[int, int, int, int] = (960, 330, 530, 440)
stash: Tuple[int, int, int, int] = (430, 330, 530, 440)
chat: Tuple[int, int, int, int] = (0, 800, 500, 200)
skill_left: Tuple[int, int, int, int] = (194, 1036, 52, 52)
skill_right: Tuple[int, int, int, int] = (1674, 1036, 52, 52)
@dataclass
class D2RColors:
"""HSV color ranges for game element detection."""
health_filled: Tuple[Tuple[int, int, int], Tuple[int, int, int]] = (
(0, 100, 100), (10, 255, 255) # Red
)
mana_filled: Tuple[Tuple[int, int, int], Tuple[int, int, int]] = (
(100, 100, 100), (130, 255, 255) # Blue
)
item_unique: Tuple[Tuple[int, int, int], Tuple[int, int, int]] = (
(15, 100, 180), (30, 255, 255) # Gold/unique
)
item_set: Tuple[Tuple[int, int, int], Tuple[int, int, int]] = (
(35, 100, 150), (55, 255, 255) # Green/set
)
item_rare: Tuple[Tuple[int, int, int], Tuple[int, int, int]] = (
(15, 50, 200), (25, 150, 255) # Yellow/rare
)
portal_blue: Tuple[Tuple[int, int, int], Tuple[int, int, int]] = (
(90, 150, 150), (120, 255, 255) # Town portal blue
)
@dataclass
class D2RTimings:
"""Game-specific timing constants in seconds."""
loading_screen_max: float = 15.0
town_portal_cast: float = 3.5
teleport_delay: float = 0.15
potion_cooldown: float = 1.0
click_delay: float = 0.1
pickup_delay: float = 0.3
vendor_interaction: float = 0.5
@dataclass
class D2RConfig:
"""Master D2R configuration."""
resolution: Tuple[int, int] = (1920, 1080)
regions: D2RScreenRegions = field(default_factory=D2RScreenRegions)
colors: D2RColors = field(default_factory=D2RColors)
timings: D2RTimings = field(default_factory=D2RTimings)
# Loot filter
pickup_uniques: bool = True
pickup_sets: bool = True
pickup_rares: bool = True
pickup_runes: bool = True
min_rune_tier: int = 10 # Lem+
pickup_gems: bool = False
# Safety
health_potion_threshold: float = 0.5
mana_potion_threshold: float = 0.3
chicken_threshold: float = 0.2 # Exit game if health below this

View file

@ -1,97 +0,0 @@
"""Main D2R bot class — orchestrates the bot loop.
Entry point for the Diablo II: Resurrected bot. Manages the main
game loop, state transitions, and routine execution.
"""
import logging
import time
from typing import Optional
from engine.screen.capture import ScreenCapture
from engine.input.humanize import Humanizer
from engine.state.events import EventBus
from engine.safety.timing import SessionTimer
from games.d2r.config import D2RConfig
from games.d2r.screens.ingame import InGameDetector
from games.d2r.screens.menu import MenuDetector
logger = logging.getLogger(__name__)
class D2RBot:
"""Main Diablo II: Resurrected bot."""
def __init__(self, config: Optional[D2RConfig] = None):
self.config = config or D2RConfig()
self.screen = ScreenCapture()
self.humanizer = Humanizer()
self.events = EventBus()
self.session_timer = SessionTimer()
self.menu_detector = MenuDetector(self.config)
self.ingame_detector = InGameDetector(self.config)
self._running = False
self._current_routine = None
def start(self, routine_name: str = "mephisto") -> None:
"""Start the bot with a specific farming routine."""
logger.info(f"Starting D2R bot with routine: {routine_name}")
self._running = True
try:
self._main_loop(routine_name)
except KeyboardInterrupt:
logger.info("Bot stopped by user")
except Exception as e:
logger.error(f"Bot error: {e}", exc_info=True)
finally:
self._running = False
def stop(self) -> None:
"""Signal the bot to stop."""
self._running = False
def _main_loop(self, routine_name: str) -> None:
"""Core bot loop."""
while self._running:
# Check session timing
if self.session_timer.should_stop_session():
break_duration = self.session_timer.get_break_duration()
logger.info(f"Session break: {break_duration/60:.0f} min")
time.sleep(break_duration)
self.session_timer.start_new_session()
# Check for breaks
break_time = self.humanizer.should_take_break()
if break_time:
time.sleep(break_time)
# Capture screen
frame = self.screen.capture_screen()
# Detect state and act
# TODO: Implement state detection and routine execution
# Small delay between loop iterations
time.sleep(0.05)
def main():
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(description="D2R Bot")
parser.add_argument("--routine", default="mephisto", choices=["mephisto", "pindle", "countess"])
parser.add_argument("--resolution", default="1920x1080")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s")
config = D2RConfig()
bot = D2RBot(config)
bot.start(args.routine)
if __name__ == "__main__":
main()

View file

@ -1,84 +0,0 @@
"""Countess farming routine for D2R.
Rune farming: Create game Black Marsh WP Tower Cellar
Kill Countess Loot runes Exit Repeat
"""
import logging
from enum import Enum, auto
logger = logging.getLogger(__name__)
class CountessPhase(Enum):
CREATE_GAME = auto()
WAYPOINT_TO_MARSH = auto()
FIND_TOWER = auto()
NAVIGATE_CELLAR = auto()
KILL_COUNTESS = auto()
LOOT = auto()
EXIT_GAME = auto()
class CountessRoutine:
"""Automated Countess farming for rune drops.
Best route for mid-tier rune farming (up to Ist).
Requires navigating 5 tower cellar levels.
"""
def __init__(self, bot):
self.bot = bot
self.phase = CountessPhase.CREATE_GAME
self.run_count = 0
def execute_run(self) -> bool:
"""Execute a single Countess run."""
logger.info(f"Starting Countess run #{self.run_count + 1}")
phases = [
(CountessPhase.CREATE_GAME, self._create_game),
(CountessPhase.WAYPOINT_TO_MARSH, self._go_to_marsh),
(CountessPhase.FIND_TOWER, self._find_tower),
(CountessPhase.NAVIGATE_CELLAR, self._navigate_cellar),
(CountessPhase.KILL_COUNTESS, self._kill_countess),
(CountessPhase.LOOT, self._loot_runes),
(CountessPhase.EXIT_GAME, self._exit_game),
]
for phase, handler in phases:
self.phase = phase
if not handler():
return False
self.bot.humanizer.wait()
self.run_count += 1
return True
def _create_game(self) -> bool:
return True
def _go_to_marsh(self) -> bool:
"""Take waypoint to Black Marsh."""
return True
def _find_tower(self) -> bool:
"""Navigate from Black Marsh to Forgotten Tower entrance."""
# TODO: This is the hardest part — tower location is random
return True
def _navigate_cellar(self) -> bool:
"""Navigate through 5 cellar levels to level 5."""
# TODO: Find stairs on each level, descend
return True
def _kill_countess(self) -> bool:
"""Kill the Countess."""
return True
def _loot_runes(self) -> bool:
"""Pick up rune drops (Countess has special rune drop table)."""
return True
def _exit_game(self) -> bool:
return True

View file

@ -1,105 +0,0 @@
"""Mephisto farming routine for D2R.
Classic Mephisto run: Create game Teleport to Durance 3
Kill Mephisto Loot Exit Repeat
"""
import logging
import time
from enum import Enum, auto
logger = logging.getLogger(__name__)
class MephistoPhase(Enum):
CREATE_GAME = auto()
TELEPORT_TO_DURANCE = auto()
FIND_MEPHISTO = auto()
KILL_MEPHISTO = auto()
LOOT = auto()
TOWN_PORTAL = auto()
STASH_ITEMS = auto()
EXIT_GAME = auto()
class MephistoRoutine:
"""Automated Mephisto farming runs.
Designed for Sorceress with Teleport. Can be adapted for
other classes with Enigma runeword.
"""
def __init__(self, bot):
self.bot = bot
self.phase = MephistoPhase.CREATE_GAME
self.run_count = 0
self.items_found = 0
def execute_run(self) -> bool:
"""Execute a single Mephisto run. Returns True if successful."""
logger.info(f"Starting Mephisto run #{self.run_count + 1}")
phases = [
(MephistoPhase.CREATE_GAME, self._create_game),
(MephistoPhase.TELEPORT_TO_DURANCE, self._teleport_to_durance),
(MephistoPhase.FIND_MEPHISTO, self._find_mephisto),
(MephistoPhase.KILL_MEPHISTO, self._kill_mephisto),
(MephistoPhase.LOOT, self._loot_items),
(MephistoPhase.TOWN_PORTAL, self._town_portal),
(MephistoPhase.STASH_ITEMS, self._stash_items),
(MephistoPhase.EXIT_GAME, self._exit_game),
]
for phase, handler in phases:
self.phase = phase
logger.debug(f"Phase: {phase.name}")
if not handler():
logger.warning(f"Phase {phase.name} failed")
return False
self.bot.humanizer.wait()
self.run_count += 1
logger.info(f"Run #{self.run_count} complete. Total items: {self.items_found}")
return True
def _create_game(self) -> bool:
"""Create a new game."""
# TODO: Navigate menu → create game with random name
return True
def _teleport_to_durance(self) -> bool:
"""Teleport from Act 3 town to Durance of Hate Level 3."""
# TODO: Navigate waypoint → Durance 2 → teleport to Durance 3
return True
def _find_mephisto(self) -> bool:
"""Locate Mephisto on Durance 3."""
# TODO: Teleport around to find Mephisto (moat trick position)
return True
def _kill_mephisto(self) -> bool:
"""Kill Mephisto using appropriate skill rotation."""
# TODO: Position at moat trick spot, cast spells
return True
def _loot_items(self) -> bool:
"""Pick up valuable items."""
# TODO: Detect and pick up items based on loot filter
return True
def _town_portal(self) -> bool:
"""Cast town portal and go to town."""
# TODO: Cast TP, click portal
return True
def _stash_items(self) -> bool:
"""Stash items if inventory is getting full."""
# TODO: Open stash, transfer items
return True
def _exit_game(self) -> bool:
"""Exit the current game."""
# TODO: Save & Exit
return True

View file

@ -1,80 +0,0 @@
"""Pindleskin farming routine for D2R.
Fastest MF run: Create game Take red portal in Harrogath
Kill Pindleskin Loot Exit Repeat
"""
import logging
from enum import Enum, auto
logger = logging.getLogger(__name__)
class PindlePhase(Enum):
CREATE_GAME = auto()
TAKE_PORTAL = auto()
FIND_PINDLE = auto()
KILL_PINDLE = auto()
LOOT = auto()
EXIT_GAME = auto()
class PindleRoutine:
"""Automated Pindleskin farming runs.
The simplest and fastest MF route. Requires Act 5 red portal
(Anya quest completed). Works with any class.
"""
def __init__(self, bot):
self.bot = bot
self.phase = PindlePhase.CREATE_GAME
self.run_count = 0
def execute_run(self) -> bool:
"""Execute a single Pindleskin run."""
logger.info(f"Starting Pindle run #{self.run_count + 1}")
phases = [
(PindlePhase.CREATE_GAME, self._create_game),
(PindlePhase.TAKE_PORTAL, self._take_red_portal),
(PindlePhase.FIND_PINDLE, self._find_pindle),
(PindlePhase.KILL_PINDLE, self._kill_pindle),
(PindlePhase.LOOT, self._loot_items),
(PindlePhase.EXIT_GAME, self._exit_game),
]
for phase, handler in phases:
self.phase = phase
if not handler():
return False
self.bot.humanizer.wait()
self.run_count += 1
return True
def _create_game(self) -> bool:
return True
def _take_red_portal(self) -> bool:
"""Navigate to and enter the red portal near Anya."""
# TODO: Find red portal in Harrogath, click it
return True
def _find_pindle(self) -> bool:
"""Locate Pindleskin in Nihlathak's Temple entrance."""
# TODO: Move toward Pindle's fixed spawn location
return True
def _kill_pindle(self) -> bool:
"""Kill Pindleskin and his minions."""
# TODO: Attack routine
return True
def _loot_items(self) -> bool:
"""Pick up valuable drops."""
return True
def _exit_game(self) -> bool:
"""Exit game."""
return True

View file

@ -1,89 +0,0 @@
"""In-game state detection for D2R.
Detects health/mana, location, enemies, items on ground, etc.
"""
from typing import Optional, List, Tuple
import logging
import numpy as np
from engine.vision.color import ColorAnalyzer
from engine.vision.detector import ElementDetector, Detection
from games.d2r.config import D2RConfig
logger = logging.getLogger(__name__)
class InGameDetector:
"""Detects in-game state from screen captures."""
def __init__(self, config: D2RConfig):
self.config = config
self.color = ColorAnalyzer()
self.detector = ElementDetector()
def is_in_game(self, screen: np.ndarray) -> bool:
"""Check if we're in an active game (health/mana orbs visible)."""
health = self.get_health_percentage(screen)
return health > 0
def get_health_percentage(self, screen: np.ndarray) -> float:
"""Read current health from the health orb."""
return self.color.read_bar_percentage(
screen,
self.config.regions.health_orb,
self.config.colors.health_filled,
)
def get_mana_percentage(self, screen: np.ndarray) -> float:
"""Read current mana from the mana orb."""
return self.color.read_bar_percentage(
screen,
self.config.regions.mana_orb,
self.config.colors.mana_filled,
)
def is_dead(self, screen: np.ndarray) -> bool:
"""Check if character is dead."""
# Health at 0 + death screen elements
return self.get_health_percentage(screen) == 0
def should_use_health_potion(self, screen: np.ndarray) -> bool:
"""Check if health is below potion threshold."""
return self.get_health_percentage(screen) < self.config.health_potion_threshold
def should_chicken(self, screen: np.ndarray) -> bool:
"""Check if health is critically low (exit game)."""
return self.get_health_percentage(screen) < self.config.chicken_threshold
def find_items_on_ground(self, screen: np.ndarray) -> List[Detection]:
"""Detect item labels on the ground."""
items = []
if self.config.pickup_uniques:
items.extend(self.detector.find_by_color(
screen, *self.config.colors.item_unique,
min_area=50, label="unique",
))
if self.config.pickup_sets:
items.extend(self.detector.find_by_color(
screen, *self.config.colors.item_set,
min_area=50, label="set",
))
return items
def find_portal(self, screen: np.ndarray) -> Optional[Detection]:
"""Detect a town portal on screen."""
portals = self.detector.find_by_color(
screen, *self.config.colors.portal_blue,
min_area=200, label="portal",
)
return portals[0] if portals else None
def is_inventory_open(self, screen: np.ndarray) -> bool:
"""Check if the inventory panel is open."""
# TODO: Template match inventory panel
return False

View file

@ -1,40 +0,0 @@
"""Inventory management for D2R.
Handles inventory scanning, item identification, stash management.
"""
from typing import List, Optional, Tuple
import logging
import numpy as np
from engine.vision.detector import ElementDetector, Detection
from games.d2r.config import D2RConfig
logger = logging.getLogger(__name__)
class InventoryManager:
"""Manages inventory state and item operations."""
def __init__(self, config: D2RConfig):
self.config = config
self.detector = ElementDetector()
def is_full(self, screen: np.ndarray) -> bool:
"""Check if inventory is full."""
# TODO: Scan inventory grid for empty slots
return False
def find_empty_slot(self, screen: np.ndarray) -> Optional[Tuple[int, int]]:
"""Find an empty inventory slot."""
# TODO: Grid scanning
return None
def count_items(self, screen: np.ndarray) -> int:
"""Count items in inventory."""
return 0
def should_go_to_town(self, screen: np.ndarray) -> bool:
"""Check if inventory is full enough to warrant a town trip."""
return self.is_full(screen)

View file

@ -1,45 +0,0 @@
"""Main menu and character select screen detection for D2R."""
from typing import Optional
import logging
import numpy as np
from engine.vision.detector import ElementDetector, Detection
from games.d2r.config import D2RConfig
logger = logging.getLogger(__name__)
class MenuDetector:
"""Detects D2R menu screens (main menu, character select, lobby)."""
def __init__(self, config: D2RConfig):
self.config = config
self.detector = ElementDetector()
def is_main_menu(self, screen: np.ndarray) -> bool:
"""Check if we're on the main menu."""
# TODO: Template match for main menu elements
return False
def is_character_select(self, screen: np.ndarray) -> bool:
"""Check if we're on character select screen."""
return False
def is_lobby(self, screen: np.ndarray) -> bool:
"""Check if we're in the game lobby."""
return False
def is_loading(self, screen: np.ndarray) -> bool:
"""Check if a loading screen is active."""
return False
def select_character(self, screen: np.ndarray, char_index: int = 0) -> Optional[Detection]:
"""Find and return the character slot to click."""
# TODO: Detect character slots
return None
def find_create_game_button(self, screen: np.ndarray) -> Optional[Detection]:
"""Find the create game button."""
return None

3
go.mod Normal file
View file

@ -0,0 +1,3 @@
module git.cloonar.com/openclawd/iso-bot
go 1.23

93
pkg/api/api.go Normal file
View file

@ -0,0 +1,93 @@
// Package api provides the REST + WebSocket API for the bot dashboard.
//
// Endpoints:
// GET /api/status — bot status, current state, routine, stats
// GET /api/config — current configuration
// PUT /api/config — update configuration
// POST /api/start — start bot with routine
// POST /api/stop — stop bot
// POST /api/pause — pause/resume
// GET /api/routines — list available routines
// GET /api/loot/rules — get loot filter rules
// PUT /api/loot/rules — update loot filter rules
// GET /api/stats — run statistics, items found, etc.
// WS /api/ws — real-time status stream
//
// The API is served by the bot process itself (single binary).
package api
import (
"encoding/json"
"net/http"
"sync"
)
// Status represents the current bot status.
type Status struct {
Running bool `json:"running"`
Paused bool `json:"paused"`
GameState string `json:"gameState"`
Routine string `json:"routine,omitempty"`
Phase string `json:"phase,omitempty"`
RunCount int `json:"runCount"`
ItemsFound int `json:"itemsFound"`
Uptime string `json:"uptime"`
CaptureFPS float64 `json:"captureFps"`
HealthPct float64 `json:"healthPct"`
ManaPct float64 `json:"manaPct"`
}
// Server provides the HTTP API and WebSocket endpoint.
type Server struct {
mu sync.RWMutex
status Status
addr string
mux *http.ServeMux
}
// NewServer creates an API server on the given address.
func NewServer(addr string) *Server {
s := &Server{
addr: addr,
mux: http.NewServeMux(),
}
s.registerRoutes()
return s
}
// Start begins serving the API.
func (s *Server) Start() error {
return http.ListenAndServe(s.addr, s.mux)
}
// UpdateStatus updates the bot status (called by the engine).
func (s *Server) UpdateStatus(status Status) {
s.mu.Lock()
defer s.mu.Unlock()
s.status = status
// TODO: Broadcast to WebSocket clients
}
func (s *Server) registerRoutes() {
s.mux.HandleFunc("GET /api/status", s.handleStatus)
s.mux.HandleFunc("POST /api/start", s.handleStart)
s.mux.HandleFunc("POST /api/stop", s.handleStop)
// TODO: Remaining routes
}
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
s.mu.RLock()
defer s.mu.RUnlock()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(s.status)
}
func (s *Server) handleStart(w http.ResponseWriter, r *http.Request) {
// TODO: Signal engine to start
w.WriteHeader(http.StatusAccepted)
}
func (s *Server) handleStop(w http.ResponseWriter, r *http.Request) {
// TODO: Signal engine to stop
w.WriteHeader(http.StatusAccepted)
}

View file

@ -0,0 +1,99 @@
// Package capture provides screen capture from various sources.
//
// Supports capturing from:
// - Local window (by title or handle)
// - VM display (VNC, Spice, or VM window on host)
// - Full screen / monitor region
//
// The capture interface is source-agnostic — the engine doesn't care
// where the frames come from.
package capture
import (
"image"
"time"
)
// Region defines a rectangular area to capture.
type Region struct {
X, Y, Width, Height int
}
// Source represents a capture source (window, VM, screen, etc.)
type Source interface {
// Name returns a human-readable description of the source.
Name() string
// Capture grabs a single frame.
Capture() (image.Image, error)
// CaptureRegion grabs a sub-region of the source.
CaptureRegion(r Region) (image.Image, error)
// Size returns the source dimensions.
Size() (width, height int)
// Close releases resources.
Close() error
}
// Stats tracks capture performance metrics.
type Stats struct {
FrameCount uint64
AvgCaptureMs float64
FPS float64
LastCapture time.Time
}
// Manager handles screen capture with performance tracking.
type Manager struct {
source Source
stats Stats
}
// NewManager creates a capture manager with the given source.
func NewManager(source Source) *Manager {
return &Manager{source: source}
}
// Capture grabs a frame and updates performance stats.
func (m *Manager) Capture() (image.Image, error) {
start := time.Now()
frame, err := m.source.Capture()
if err != nil {
return nil, err
}
elapsed := time.Since(start)
m.stats.FrameCount++
m.stats.LastCapture = start
// Rolling average
alpha := 0.1
ms := float64(elapsed.Microseconds()) / 1000.0
if m.stats.FrameCount == 1 {
m.stats.AvgCaptureMs = ms
} else {
m.stats.AvgCaptureMs = m.stats.AvgCaptureMs*(1-alpha) + ms*alpha
}
if m.stats.AvgCaptureMs > 0 {
m.stats.FPS = 1000.0 / m.stats.AvgCaptureMs
}
return frame, nil
}
// CaptureRegion grabs a sub-region.
func (m *Manager) CaptureRegion(r Region) (image.Image, error) {
return m.source.CaptureRegion(r)
}
// Stats returns current capture performance stats.
func (m *Manager) Stats() Stats {
return m.stats
}
// Close releases the capture source.
func (m *Manager) Close() error {
return m.source.Close()
}

View file

@ -0,0 +1,103 @@
// Package input — humanize.go provides human-like behavior randomization.
package input
import (
"image"
"math/rand"
"sync/atomic"
"time"
)
// HumanProfile defines behavior parameters for human-like input.
type HumanProfile struct {
ReactionMin time.Duration // min reaction delay
ReactionMax time.Duration // max reaction delay
MouseSpeedMin float64 // min pixels per second
MouseSpeedMax float64 // max pixels per second
ClickJitter int // pixel jitter on clicks
HesitationChance float64 // chance of extra pause (0-1)
HesitationMin time.Duration
HesitationMax time.Duration
}
// DefaultProfile returns a realistic human behavior profile.
func DefaultProfile() HumanProfile {
return HumanProfile{
ReactionMin: 150 * time.Millisecond,
ReactionMax: 450 * time.Millisecond,
MouseSpeedMin: 400,
MouseSpeedMax: 1200,
ClickJitter: 3,
HesitationChance: 0.1,
HesitationMin: 300 * time.Millisecond,
HesitationMax: 1200 * time.Millisecond,
}
}
// Humanizer applies human-like randomization to bot actions.
type Humanizer struct {
Profile HumanProfile
actionCount atomic.Int64
}
// NewHumanizer creates a humanizer with the given profile.
func NewHumanizer(profile HumanProfile) *Humanizer {
return &Humanizer{Profile: profile}
}
// ReactionDelay returns a randomized human-like reaction delay.
func (h *Humanizer) ReactionDelay() time.Duration {
minMs := h.Profile.ReactionMin.Milliseconds()
maxMs := h.Profile.ReactionMax.Milliseconds()
base := time.Duration(minMs+rand.Int63n(maxMs-minMs)) * time.Millisecond
// Occasional hesitation
if rand.Float64() < h.Profile.HesitationChance {
hesMinMs := h.Profile.HesitationMin.Milliseconds()
hesMaxMs := h.Profile.HesitationMax.Milliseconds()
base += time.Duration(hesMinMs+rand.Int63n(hesMaxMs-hesMinMs)) * time.Millisecond
}
// Slight fatigue factor
actions := float64(h.actionCount.Load())
fatigue := min(actions/1000.0, 0.3)
base = time.Duration(float64(base) * (1 + fatigue*rand.Float64()))
return base
}
// Wait pauses for a human-like reaction delay.
func (h *Humanizer) Wait() {
time.Sleep(h.ReactionDelay())
h.actionCount.Add(1)
}
// WaitMs pauses for baseMs ± varianceMs with human randomization.
func (h *Humanizer) WaitMs(baseMs, varianceMs int) {
actual := baseMs + rand.Intn(2*varianceMs+1) - varianceMs
if actual < 0 {
actual = 0
}
time.Sleep(time.Duration(actual) * time.Millisecond)
}
// JitterPosition adds random offset to a click position.
func (h *Humanizer) JitterPosition(pos image.Point) image.Point {
j := h.Profile.ClickJitter
return image.Point{
X: pos.X + rand.Intn(2*j+1) - j,
Y: pos.Y + rand.Intn(2*j+1) - j,
}
}
// MouseSpeed returns a randomized mouse movement speed.
func (h *Humanizer) MouseSpeed() float64 {
return h.Profile.MouseSpeedMin + rand.Float64()*(h.Profile.MouseSpeedMax-h.Profile.MouseSpeedMin)
}
func min(a, b float64) float64 {
if a < b {
return a
}
return b
}

162
pkg/engine/input/input.go Normal file
View file

@ -0,0 +1,162 @@
// Package input provides human-like mouse and keyboard simulation.
//
// Mouse movement uses Bézier curves with natural acceleration.
// All inputs include randomized timing to mimic human behavior.
// Platform-specific backends: SendInput (Windows), X11/uinput (Linux).
package input
import (
"image"
"math"
"math/rand"
"time"
)
// MouseController handles human-like mouse movement and clicks.
type MouseController struct {
humanizer *Humanizer
}
// NewMouseController creates a mouse controller with the given humanizer.
func NewMouseController(h *Humanizer) *MouseController {
return &MouseController{humanizer: h}
}
// MoveTo moves the mouse to target using a Bézier curve.
func (m *MouseController) MoveTo(target image.Point) {
// Get current position
current := m.GetPosition()
// Generate Bézier control points
points := m.bezierPath(current, target)
// Animate along the path
speed := m.humanizer.MouseSpeed()
totalDist := m.pathLength(points)
steps := int(totalDist / speed * 1000) // ms-based steps
if steps < 5 {
steps = 5
}
for i := 1; i <= steps; i++ {
t := float64(i) / float64(steps)
// Ease in-out for natural acceleration
t = m.easeInOut(t)
p := m.evalBezier(points, t)
m.setPosition(p)
time.Sleep(time.Millisecond)
}
}
// Click performs a mouse click at the current position.
func (m *MouseController) Click() {
m.humanizer.Wait()
// TODO: Platform-specific click (SendInput / X11)
// Randomize hold duration
holdMs := 50 + rand.Intn(80)
time.Sleep(time.Duration(holdMs) * time.Millisecond)
}
// ClickAt moves to position and clicks.
func (m *MouseController) ClickAt(pos image.Point) {
jittered := m.humanizer.JitterPosition(pos)
m.MoveTo(jittered)
m.Click()
}
// RightClick performs a right mouse click.
func (m *MouseController) RightClick() {
m.humanizer.Wait()
holdMs := 50 + rand.Intn(80)
time.Sleep(time.Duration(holdMs) * time.Millisecond)
}
// GetPosition returns current mouse position.
func (m *MouseController) GetPosition() image.Point {
// TODO: Platform-specific implementation
return image.Point{}
}
func (m *MouseController) setPosition(p image.Point) {
// TODO: Platform-specific implementation
}
// bezierPath generates a cubic Bézier curve with randomized control points.
func (m *MouseController) bezierPath(start, end image.Point) [4]image.Point {
dx := float64(end.X - start.X)
dy := float64(end.Y - start.Y)
// Randomize control points for natural curvature
cp1 := image.Point{
X: start.X + int(dx*0.25+rand.Float64()*50-25),
Y: start.Y + int(dy*0.25+rand.Float64()*50-25),
}
cp2 := image.Point{
X: start.X + int(dx*0.75+rand.Float64()*50-25),
Y: start.Y + int(dy*0.75+rand.Float64()*50-25),
}
return [4]image.Point{start, cp1, cp2, end}
}
// evalBezier evaluates a cubic Bézier curve at parameter t.
func (m *MouseController) evalBezier(pts [4]image.Point, t float64) image.Point {
u := 1 - t
return image.Point{
X: int(u*u*u*float64(pts[0].X) + 3*u*u*t*float64(pts[1].X) + 3*u*t*t*float64(pts[2].X) + t*t*t*float64(pts[3].X)),
Y: int(u*u*u*float64(pts[0].Y) + 3*u*u*t*float64(pts[1].Y) + 3*u*t*t*float64(pts[2].Y) + t*t*t*float64(pts[3].Y)),
}
}
// easeInOut applies ease-in-out for natural mouse acceleration.
func (m *MouseController) easeInOut(t float64) float64 {
return t * t * (3 - 2*t)
}
func (m *MouseController) pathLength(pts [4]image.Point) float64 {
length := 0.0
prev := pts[0]
for i := 1; i <= 20; i++ {
t := float64(i) / 20.0
p := m.evalBezier(pts, t)
dx := float64(p.X - prev.X)
dy := float64(p.Y - prev.Y)
length += math.Sqrt(dx*dx + dy*dy)
prev = p
}
return length
}
// KeyboardController handles human-like keyboard input.
type KeyboardController struct {
humanizer *Humanizer
}
// NewKeyboardController creates a keyboard controller.
func NewKeyboardController(h *Humanizer) *KeyboardController {
return &KeyboardController{humanizer: h}
}
// PressKey presses and releases a key with human-like timing.
func (k *KeyboardController) PressKey(key string) {
k.humanizer.Wait()
// TODO: Platform-specific key press
holdMs := 30 + rand.Intn(70)
time.Sleep(time.Duration(holdMs) * time.Millisecond)
}
// TypeText types text with randomized inter-key delays.
func (k *KeyboardController) TypeText(text string) {
for _, ch := range text {
k.PressKey(string(ch))
delay := 30 + rand.Intn(120) // 30-150ms between keys
time.Sleep(time.Duration(delay) * time.Millisecond)
}
}
// HoldKey holds a key down for a duration.
func (k *KeyboardController) HoldKey(key string, durationMs int) {
// TODO: Platform-specific key down/up
variance := rand.Intn(durationMs / 5)
time.Sleep(time.Duration(durationMs+variance) * time.Millisecond)
}

120
pkg/engine/loot/filter.go Normal file
View file

@ -0,0 +1,120 @@
// Package loot provides a declarative, rule-based loot filter engine.
//
// Loot rules are defined in YAML and evaluated against detected items.
// Supports complex matching on type, name, rarity, properties, and more.
package loot
import (
"strings"
"git.cloonar.com/openclawd/iso-bot/pkg/plugin"
)
// Priority levels for pickup ordering.
const (
PriorityLow = 1
PriorityNormal = 5
PriorityHigh = 8
PriorityCritical = 10
)
// Condition defines a match condition for a loot rule.
type Condition struct {
Type *string `yaml:"type,omitempty"` // "unique", "set", "rare", etc.
NameContains *string `yaml:"name_contains,omitempty"` // substring match
NameExact *string `yaml:"name_exact,omitempty"` // exact match
MinRarity *int `yaml:"min_rarity,omitempty"` // minimum rarity tier
MaxRarity *int `yaml:"max_rarity,omitempty"`
BaseType *string `yaml:"base_type,omitempty"` // e.g., "Diadem", "Monarch"
HasProperty *string `yaml:"has_property,omitempty"` // item has this property key
PropertyGTE map[string]int `yaml:"property_gte,omitempty"` // property >= value
}
// Action defines what to do when a rule matches.
type Action string
const (
ActionPickup Action = "pickup"
ActionIgnore Action = "ignore"
ActionAlert Action = "alert" // pickup + send notification
)
// Rule is a single loot filter rule.
type Rule struct {
Name string `yaml:"name,omitempty"`
Match Condition `yaml:"match"`
Action Action `yaml:"action"`
Priority int `yaml:"priority,omitempty"`
}
// RuleEngine evaluates items against a list of rules.
type RuleEngine struct {
Rules []Rule
}
// NewRuleEngine creates a rule engine from a list of rules.
func NewRuleEngine(rules []Rule) *RuleEngine {
return &RuleEngine{Rules: rules}
}
// Evaluate checks an item against all rules.
// Returns the first matching rule's action and priority.
func (e *RuleEngine) Evaluate(item plugin.DetectedItem) (action Action, priority int, matched bool) {
for _, rule := range e.Rules {
if e.matches(rule.Match, item) {
p := rule.Priority
if p == 0 {
p = PriorityNormal
}
return rule.Action, p, true
}
}
return ActionIgnore, 0, false
}
// ShouldPickup implements plugin.LootFilter.
func (e *RuleEngine) ShouldPickup(item plugin.DetectedItem) (bool, int) {
action, priority, matched := e.Evaluate(item)
if !matched {
return false, 0
}
return action == ActionPickup || action == ActionAlert, priority
}
// ShouldAlert implements plugin.LootFilter.
func (e *RuleEngine) ShouldAlert(item plugin.DetectedItem) bool {
action, _, matched := e.Evaluate(item)
return matched && action == ActionAlert
}
func (e *RuleEngine) matches(cond Condition, item plugin.DetectedItem) bool {
if cond.Type != nil && !strings.EqualFold(item.Type, *cond.Type) {
return false
}
if cond.NameContains != nil && !strings.Contains(strings.ToLower(item.Name), strings.ToLower(*cond.NameContains)) {
return false
}
if cond.NameExact != nil && !strings.EqualFold(item.Name, *cond.NameExact) {
return false
}
if cond.MinRarity != nil && item.Rarity < *cond.MinRarity {
return false
}
if cond.MaxRarity != nil && item.Rarity > *cond.MaxRarity {
return false
}
if cond.BaseType != nil && !strings.EqualFold(item.Properties["base_type"], *cond.BaseType) {
return false
}
if cond.HasProperty != nil {
if _, ok := item.Properties[*cond.HasProperty]; !ok {
return false
}
}
for key, minVal := range cond.PropertyGTE {
// TODO: Parse property value as int and compare
_ = key
_ = minVal
}
return true
}

108
pkg/engine/safety/safety.go Normal file
View file

@ -0,0 +1,108 @@
// Package safety provides anti-detection measures: session timing,
// break scheduling, and behavioral pattern randomization.
package safety
import (
"math/rand"
"time"
)
// SessionConfig defines play session parameters.
type SessionConfig struct {
MinSessionHours float64
MaxSessionHours float64
MinBreakMinutes float64
MaxBreakMinutes float64
MaxDailyHours float64
MicroBreakMinSec int
MicroBreakMaxSec int
MicroBreakIntervalMinSec int
MicroBreakIntervalMaxSec int
}
// DefaultSessionConfig returns realistic session timing.
func DefaultSessionConfig() SessionConfig {
return SessionConfig{
MinSessionHours: 1.0,
MaxSessionHours: 4.0,
MinBreakMinutes: 10.0,
MaxBreakMinutes: 45.0,
MaxDailyHours: 12.0,
MicroBreakMinSec: 2,
MicroBreakMaxSec: 8,
MicroBreakIntervalMinSec: 120,
MicroBreakIntervalMaxSec: 300,
}
}
// SessionTimer manages play session duration and breaks.
type SessionTimer struct {
config SessionConfig
sessionStart time.Time
dailyPlaytime time.Duration
targetDuration time.Duration
nextMicroBreak time.Time
}
// NewSessionTimer creates a session timer.
func NewSessionTimer(config SessionConfig) *SessionTimer {
st := &SessionTimer{
config: config,
sessionStart: time.Now(),
}
st.targetDuration = st.rollSessionDuration()
st.nextMicroBreak = st.scheduleMicroBreak()
return st
}
// ShouldStopSession returns true if the current session should end.
func (s *SessionTimer) ShouldStopSession() bool {
elapsed := time.Since(s.sessionStart)
if elapsed >= s.targetDuration {
return true
}
if s.dailyPlaytime+elapsed >= time.Duration(s.config.MaxDailyHours*float64(time.Hour)) {
return true
}
return false
}
// ShouldMicroBreak returns the break duration if it's time, or 0.
func (s *SessionTimer) ShouldMicroBreak() time.Duration {
if time.Now().After(s.nextMicroBreak) {
duration := time.Duration(s.config.MicroBreakMinSec+rand.Intn(s.config.MicroBreakMaxSec-s.config.MicroBreakMinSec+1)) * time.Second
s.nextMicroBreak = s.scheduleMicroBreak()
return duration
}
return 0
}
// BreakDuration returns a randomized long break duration.
func (s *SessionTimer) BreakDuration() time.Duration {
minMs := int(s.config.MinBreakMinutes * 60 * 1000)
maxMs := int(s.config.MaxBreakMinutes * 60 * 1000)
return time.Duration(minMs+rand.Intn(maxMs-minMs)) * time.Millisecond
}
// StartNewSession resets for a new play session.
func (s *SessionTimer) StartNewSession() {
s.dailyPlaytime += time.Since(s.sessionStart)
s.sessionStart = time.Now()
s.targetDuration = s.rollSessionDuration()
}
// Elapsed returns time in current session.
func (s *SessionTimer) Elapsed() time.Duration {
return time.Since(s.sessionStart)
}
func (s *SessionTimer) rollSessionDuration() time.Duration {
minMs := int(s.config.MinSessionHours * 3600 * 1000)
maxMs := int(s.config.MaxSessionHours * 3600 * 1000)
return time.Duration(minMs+rand.Intn(maxMs-minMs)) * time.Millisecond
}
func (s *SessionTimer) scheduleMicroBreak() time.Time {
interval := s.config.MicroBreakIntervalMinSec + rand.Intn(s.config.MicroBreakIntervalMaxSec-s.config.MicroBreakIntervalMinSec+1)
return time.Now().Add(time.Duration(interval) * time.Second)
}

86
pkg/engine/state/state.go Normal file
View file

@ -0,0 +1,86 @@
// Package state provides game state machine management with event callbacks.
package state
import (
"image"
"sync"
"time"
"git.cloonar.com/openclawd/iso-bot/pkg/plugin"
)
// Transition records a state change.
type Transition struct {
From plugin.GameState
To plugin.GameState
Timestamp time.Time
}
// Manager tracks game state and fires callbacks on transitions.
type Manager struct {
mu sync.RWMutex
current plugin.GameState
previous plugin.GameState
enterTime time.Time
history []Transition
callbacks map[plugin.GameState][]func(Transition)
detector plugin.GameDetector
}
// NewManager creates a state manager with the given detector.
func NewManager(detector plugin.GameDetector) *Manager {
return &Manager{
current: plugin.StateUnknown,
enterTime: time.Now(),
callbacks: make(map[plugin.GameState][]func(Transition)),
detector: detector,
}
}
// Update analyzes the current frame and updates state.
func (m *Manager) Update(frame image.Image) plugin.GameState {
newState := m.detector.DetectState(frame)
m.mu.Lock()
defer m.mu.Unlock()
if newState != m.current {
t := Transition{
From: m.current,
To: newState,
Timestamp: time.Now(),
}
m.history = append(m.history, t)
m.previous = m.current
m.current = newState
m.enterTime = time.Now()
// Fire callbacks outside lock? For simplicity, keep in lock for now.
for _, cb := range m.callbacks[newState] {
cb(t)
}
}
return m.current
}
// Current returns the current game state.
func (m *Manager) Current() plugin.GameState {
m.mu.RLock()
defer m.mu.RUnlock()
return m.current
}
// TimeInState returns how long we've been in the current state.
func (m *Manager) TimeInState() time.Duration {
m.mu.RLock()
defer m.mu.RUnlock()
return time.Since(m.enterTime)
}
// OnState registers a callback for entering a specific state.
func (m *Manager) OnState(state plugin.GameState, cb func(Transition)) {
m.mu.Lock()
defer m.mu.Unlock()
m.callbacks[state] = append(m.callbacks[state], cb)
}

View file

@ -0,0 +1,88 @@
// Package vision provides computer vision utilities for game screen analysis.
//
// Uses GoCV (OpenCV bindings for Go) for template matching, color detection,
// and contour analysis. Designed for high-throughput real-time analysis.
package vision
import (
"image"
"image/color"
)
// Match represents a detected element on screen.
type Match struct {
Position image.Point
BBox image.Rectangle
Confidence float64
Label string
}
// Template is a pre-loaded image template for matching.
type Template struct {
Name string
Image image.Image
Width int
Height int
}
// ColorRange defines an HSV color range for detection.
type ColorRange struct {
LowerH, LowerS, LowerV int
UpperH, UpperS, UpperV int
}
// Pipeline processes frames through a series of vision operations.
type Pipeline struct {
templates map[string]*Template
threshold float64
}
// NewPipeline creates a vision pipeline with the given confidence threshold.
func NewPipeline(threshold float64) *Pipeline {
return &Pipeline{
templates: make(map[string]*Template),
threshold: threshold,
}
}
// LoadTemplate loads a template image for matching.
func (p *Pipeline) LoadTemplate(name string, img image.Image) {
bounds := img.Bounds()
p.templates[name] = &Template{
Name: name,
Image: img,
Width: bounds.Dx(),
Height: bounds.Dy(),
}
}
// FindTemplate searches for a template in the frame.
// Returns the best match above threshold, or nil.
func (p *Pipeline) FindTemplate(frame image.Image, templateName string) *Match {
// TODO: Implement with GoCV matchTemplate
// This is a stub — actual implementation needs gocv.MatchTemplate
return nil
}
// FindAllTemplates finds all matches of a template above threshold.
func (p *Pipeline) FindAllTemplates(frame image.Image, templateName string) []Match {
// TODO: Implement with GoCV + NMS
return nil
}
// FindByColor detects regions matching an HSV color range.
func (p *Pipeline) FindByColor(frame image.Image, colorRange ColorRange, minArea int) []Match {
// TODO: Implement with GoCV inRange + findContours
return nil
}
// ReadBarPercentage reads a horizontal bar's fill level (health, mana, xp).
func (p *Pipeline) ReadBarPercentage(frame image.Image, barRegion image.Rectangle, filledColor ColorRange) float64 {
// TODO: Implement — scan columns for filled color ratio
return 0.0
}
// GetPixelColor returns the color at a specific pixel.
func (p *Pipeline) GetPixelColor(frame image.Image, x, y int) color.Color {
return frame.At(x, y)
}

148
pkg/plugin/plugin.go Normal file
View file

@ -0,0 +1,148 @@
// Package plugin defines the interfaces that game plugins must implement.
//
// The engine is game-agnostic. All game-specific logic lives in plugins
// that implement these interfaces. Adding a new game = a new plugin.
package plugin
import (
"context"
"image"
)
// GameState represents the current state of the game (menu, loading, in-game, etc.)
type GameState string
const (
StateUnknown GameState = "unknown"
StateLoading GameState = "loading"
StateMainMenu GameState = "main_menu"
StateCharacterSelect GameState = "character_select"
StateInGame GameState = "in_game"
StateInventory GameState = "inventory"
StateDead GameState = "dead"
StateDisconnected GameState = "disconnected"
)
// DetectedItem represents an item found on screen.
type DetectedItem struct {
Name string
Type string // "unique", "set", "rare", "rune", "normal"
Rarity int // game-specific rarity tier
Position image.Point
BBox image.Rectangle
Confidence float64
Properties map[string]string // parsed item properties
}
// VitalStats represents character health/mana/etc.
type VitalStats struct {
HealthPct float64 // 0.0 - 1.0
ManaPct float64
XPPct float64
}
// GameDetector detects the current game state from a screen capture.
type GameDetector interface {
// DetectState analyzes a screenshot and returns the current game state.
DetectState(frame image.Image) GameState
// ReadVitals reads health, mana, and other vital stats from the screen.
ReadVitals(frame image.Image) VitalStats
// IsInGame returns true if the player is in an active game session.
IsInGame(frame image.Image) bool
}
// ScreenReader extracts game information from screenshots.
type ScreenReader interface {
// FindItems detects item labels/drops on screen.
FindItems(frame image.Image) []DetectedItem
// FindPortal locates a town portal on screen.
FindPortal(frame image.Image) (image.Point, bool)
// FindEnemies detects enemy positions (optional, not all games need this).
FindEnemies(frame image.Image) []image.Point
// ReadText extracts text from a screen region (OCR).
ReadText(frame image.Image, region image.Rectangle) string
}
// Routine represents an automated game routine (e.g., a farming run).
type Routine interface {
// Name returns the routine's display name.
Name() string
// Run executes one iteration of the routine.
// Returns nil on success, error on failure (bot will handle recovery).
Run(ctx context.Context) error
// Phase returns the current phase name for status display.
Phase() string
}
// LootFilter decides which items to pick up.
type LootFilter interface {
// ShouldPickup evaluates an item against the filter rules.
ShouldPickup(item DetectedItem) (pickup bool, priority int)
// ShouldAlert returns true if this item warrants a notification.
ShouldAlert(item DetectedItem) bool
}
// Plugin is the main interface a game plugin must implement.
type Plugin interface {
// Info returns plugin metadata.
Info() PluginInfo
// Init initializes the plugin with engine services.
Init(services EngineServices) error
// Detector returns the game state detector.
Detector() GameDetector
// Reader returns the screen reader.
Reader() ScreenReader
// Routines returns available farming routines.
Routines() []Routine
// DefaultLootFilter returns the default loot filter.
DefaultLootFilter() LootFilter
}
// PluginInfo describes a game plugin.
type PluginInfo struct {
ID string // e.g., "d2r"
Name string // e.g., "Diablo II: Resurrected"
Version string
Description string
Resolution image.Point // target resolution, e.g., (1920, 1080)
}
// EngineServices provides access to engine capabilities for plugins.
type EngineServices interface {
// Capture returns the current screen frame.
Capture() image.Image
// Click sends a mouse click at the given position.
Click(pos image.Point)
// MoveMouse moves the mouse to the given position with human-like movement.
MoveMouse(pos image.Point)
// PressKey sends a key press.
PressKey(key string)
// TypeText types text with human-like delays.
TypeText(text string)
// Wait pauses for a human-like delay.
Wait()
// WaitMs pauses for a specific duration with randomization.
WaitMs(baseMs int, varianceMs int)
// Log logs a message associated with the plugin.
Log(level string, msg string, args ...any)
}

107
plugins/d2r/config.go Normal file
View file

@ -0,0 +1,107 @@
// D2R-specific configuration: screen regions, colors, timings.
package d2r
import "image"
// ScreenRegions defines UI element positions at 1920x1080.
type ScreenRegions struct {
HealthOrb image.Rectangle
ManaOrb image.Rectangle
XPBar image.Rectangle
Belt image.Rectangle
Minimap image.Rectangle
Inventory image.Rectangle
Stash image.Rectangle
SkillLeft image.Rectangle
SkillRight image.Rectangle
}
// HSVRange defines a color range in HSV space.
type HSVRange struct {
LowerH, LowerS, LowerV int
UpperH, UpperS, UpperV int
}
// Colors defines HSV ranges for game elements.
type Colors struct {
HealthFilled HSVRange
ManaFilled HSVRange
ItemUnique HSVRange
ItemSet HSVRange
ItemRare HSVRange
ItemRuneword HSVRange
PortalBlue HSVRange
}
// Timings defines game-specific delay constants.
type Timings struct {
LoadingScreenMaxMs int
TownPortalCastMs int
TeleportDelayMs int
PotionCooldownMs int
PickupDelayMs int
}
// Config holds all D2R-specific configuration.
type Config struct {
Resolution image.Point
Regions ScreenRegions
Colors Colors
Timings Timings
// Loot settings
PickupUniques bool
PickupSets bool
PickupRares bool
PickupRunes bool
MinRuneTier int
PickupGems bool
// Safety thresholds (0.0 - 1.0)
HealthPotionThreshold float64
ManaPotionThreshold float64
ChickenThreshold float64 // exit game if health below this
}
// DefaultConfig returns the default D2R config for 1920x1080.
func DefaultConfig() Config {
return Config{
Resolution: image.Point{X: 1920, Y: 1080},
Regions: ScreenRegions{
HealthOrb: image.Rect(28, 545, 198, 715),
ManaOrb: image.Rect(1722, 545, 1892, 715),
XPBar: image.Rect(0, 1058, 1920, 1080),
Belt: image.Rect(838, 1010, 1082, 1058),
Minimap: image.Rect(1600, 0, 1920, 320),
Inventory: image.Rect(960, 330, 1490, 770),
Stash: image.Rect(430, 330, 960, 770),
SkillLeft: image.Rect(194, 1036, 246, 1088),
SkillRight: image.Rect(1674, 1036, 1726, 1088),
},
Colors: Colors{
HealthFilled: HSVRange{0, 100, 100, 10, 255, 255},
ManaFilled: HSVRange{100, 100, 100, 130, 255, 255},
ItemUnique: HSVRange{15, 100, 180, 30, 255, 255},
ItemSet: HSVRange{35, 100, 150, 55, 255, 255},
ItemRare: HSVRange{15, 50, 200, 25, 150, 255},
ItemRuneword: HSVRange{15, 100, 180, 30, 255, 255},
PortalBlue: HSVRange{90, 150, 150, 120, 255, 255},
},
Timings: Timings{
LoadingScreenMaxMs: 15000,
TownPortalCastMs: 3500,
TeleportDelayMs: 150,
PotionCooldownMs: 1000,
PickupDelayMs: 300,
},
PickupUniques: true,
PickupSets: true,
PickupRares: true,
PickupRunes: true,
MinRuneTier: 10,
PickupGems: false,
HealthPotionThreshold: 0.5,
ManaPotionThreshold: 0.3,
ChickenThreshold: 0.2,
}
}

77
plugins/d2r/detector.go Normal file
View file

@ -0,0 +1,77 @@
// Game state detection for D2R.
package d2r
import (
"image"
"git.cloonar.com/openclawd/iso-bot/pkg/plugin"
)
// Detector implements plugin.GameDetector for D2R.
type Detector struct {
config Config
}
// NewDetector creates a D2R state detector.
func NewDetector(config Config) *Detector {
return &Detector{config: config}
}
// DetectState analyzes a screenshot and returns the current game state.
func (d *Detector) DetectState(frame image.Image) plugin.GameState {
// Priority-based detection:
// 1. Check for loading screen
// 2. Check for main menu
// 3. Check for character select
// 4. Check for in-game (health orb visible)
// 5. Check for death screen
if d.isLoading(frame) {
return plugin.StateLoading
}
if d.isMainMenu(frame) {
return plugin.StateMainMenu
}
if d.isCharacterSelect(frame) {
return plugin.StateCharacterSelect
}
if d.isInGame(frame) {
vitals := d.ReadVitals(frame)
if vitals.HealthPct == 0 {
return plugin.StateDead
}
return plugin.StateInGame
}
return plugin.StateUnknown
}
// ReadVitals reads health and mana from the orbs.
func (d *Detector) ReadVitals(frame image.Image) plugin.VitalStats {
// TODO: Analyze health/mana orb regions using color detection
return plugin.VitalStats{}
}
// IsInGame returns true if health orb is visible.
func (d *Detector) IsInGame(frame image.Image) bool {
return d.isInGame(frame)
}
func (d *Detector) isLoading(frame image.Image) bool {
// TODO: Check for loading screen (mostly black with loading bar)
return false
}
func (d *Detector) isMainMenu(frame image.Image) bool {
// TODO: Template match main menu elements
return false
}
func (d *Detector) isCharacterSelect(frame image.Image) bool {
// TODO: Template match character select screen
return false
}
func (d *Detector) isInGame(frame image.Image) bool {
// TODO: Check if health orb region contains red pixels
return false
}

View file

@ -0,0 +1,56 @@
# Default D2R loot filter rules.
# Rules are evaluated top-to-bottom; first match wins.
rules:
# High-value uniques — always alert
- name: "GG Uniques"
match:
type: unique
name_contains: "Shako"
action: alert
priority: 10
- name: "High Runes"
match:
type: rune
min_rarity: 20 # Vex+
action: alert
priority: 10
# Standard pickups
- name: "All Uniques"
match:
type: unique
action: pickup
priority: 8
- name: "All Sets"
match:
type: set
action: pickup
priority: 7
- name: "Mid Runes"
match:
type: rune
min_rarity: 10 # Lem+
action: pickup
priority: 6
- name: "GG Rare Bases"
match:
type: rare
base_type: "Diadem"
action: pickup
priority: 5
- name: "GG Rare Bases"
match:
type: rare
base_type: "Circlet"
action: pickup
priority: 5
# Ignore everything else
- name: "Default Ignore"
match: {}
action: ignore

65
plugins/d2r/plugin.go Normal file
View file

@ -0,0 +1,65 @@
// Package d2r implements the Diablo II: Resurrected game plugin.
package d2r
import (
"image"
"git.cloonar.com/openclawd/iso-bot/pkg/plugin"
)
// Plugin implements plugin.Plugin for D2R.
type Plugin struct {
config Config
services plugin.EngineServices
detector *Detector
reader *Reader
}
// New creates a new D2R plugin with default config.
func New() *Plugin {
return &Plugin{
config: DefaultConfig(),
}
}
// Info returns plugin metadata.
func (p *Plugin) Info() plugin.PluginInfo {
return plugin.PluginInfo{
ID: "d2r",
Name: "Diablo II: Resurrected",
Version: "0.1.0",
Description: "Bot plugin for Diablo II: Resurrected — MF runs, rune farming, and more",
Resolution: image.Point{X: 1920, Y: 1080},
}
}
// Init initializes the plugin with engine services.
func (p *Plugin) Init(services plugin.EngineServices) error {
p.services = services
p.detector = NewDetector(p.config)
p.reader = NewReader(p.config)
return nil
}
// Detector returns the game state detector.
func (p *Plugin) Detector() plugin.GameDetector {
return p.detector
}
// Reader returns the screen reader.
func (p *Plugin) Reader() plugin.ScreenReader {
return p.reader
}
// Routines returns available farming routines.
func (p *Plugin) Routines() []plugin.Routine {
return []plugin.Routine{
// TODO: Initialize routines with plugin services
}
}
// DefaultLootFilter returns the default D2R loot filter.
func (p *Plugin) DefaultLootFilter() plugin.LootFilter {
// TODO: Return default rule engine
return nil
}

47
plugins/d2r/reader.go Normal file
View file

@ -0,0 +1,47 @@
// Screen reader for D2R — extracts game information from screenshots.
package d2r
import (
"image"
"git.cloonar.com/openclawd/iso-bot/pkg/plugin"
)
// Reader implements plugin.ScreenReader for D2R.
type Reader struct {
config Config
}
// NewReader creates a D2R screen reader.
func NewReader(config Config) *Reader {
return &Reader{config: config}
}
// FindItems detects item labels on the ground.
func (r *Reader) FindItems(frame image.Image) []plugin.DetectedItem {
// TODO: Detect colored item text labels
// - Gold text = unique
// - Green text = set
// - Yellow text = rare
// - Orange text = runeword / crafted
// - White/grey = normal/magic
return nil
}
// FindPortal locates a town portal on screen.
func (r *Reader) FindPortal(frame image.Image) (image.Point, bool) {
// TODO: Detect blue portal glow
return image.Point{}, false
}
// FindEnemies detects enemy positions.
func (r *Reader) FindEnemies(frame image.Image) []image.Point {
// TODO: Enemy health bar detection
return nil
}
// ReadText extracts text from a screen region.
func (r *Reader) ReadText(frame image.Image, region image.Rectangle) string {
// TODO: OCR on the given region
return ""
}

View file

@ -0,0 +1,135 @@
// Mephisto farming routine for D2R.
//
// Classic MF run: Create game → WP to Durance 2 → Teleport to Durance 3 →
// Moat trick Mephisto → Loot → TP to town → Stash → Exit → Repeat
package mephisto
import (
"context"
"fmt"
"sync"
"git.cloonar.com/openclawd/iso-bot/pkg/plugin"
)
// Phase represents the current phase of a Mephisto run.
type Phase string
const (
PhaseCreateGame Phase = "create_game"
PhaseTeleport Phase = "teleport_to_durance"
PhaseFindBoss Phase = "find_mephisto"
PhaseKill Phase = "kill"
PhaseLoot Phase = "loot"
PhaseTownPortal Phase = "town_portal"
PhaseStash Phase = "stash"
PhaseExitGame Phase = "exit_game"
)
// Routine implements plugin.Routine for Mephisto runs.
type Routine struct {
mu sync.RWMutex
services plugin.EngineServices
phase Phase
runCount int
}
// New creates a Mephisto routine.
func New(services plugin.EngineServices) *Routine {
return &Routine{services: services}
}
// Name returns the routine name.
func (r *Routine) Name() string { return "Mephisto" }
// Phase returns current phase for status display.
func (r *Routine) Phase() string {
r.mu.RLock()
defer r.mu.RUnlock()
return string(r.phase)
}
// Run executes one Mephisto run.
func (r *Routine) Run(ctx context.Context) error {
phases := []struct {
phase Phase
handler func(ctx context.Context) error
}{
{PhaseCreateGame, r.createGame},
{PhaseTeleport, r.teleportToDurance},
{PhaseFindBoss, r.findMephisto},
{PhaseKill, r.killMephisto},
{PhaseLoot, r.lootItems},
{PhaseTownPortal, r.townPortal},
{PhaseStash, r.stashItems},
{PhaseExitGame, r.exitGame},
}
for _, p := range phases {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
r.mu.Lock()
r.phase = p.phase
r.mu.Unlock()
r.services.Log("info", fmt.Sprintf("Phase: %s", p.phase))
if err := p.handler(ctx); err != nil {
return fmt.Errorf("phase %s failed: %w", p.phase, err)
}
r.services.Wait()
}
r.mu.Lock()
r.runCount++
r.mu.Unlock()
return nil
}
func (r *Routine) createGame(ctx context.Context) error {
// TODO: Navigate lobby → create game with randomized name
return nil
}
func (r *Routine) teleportToDurance(ctx context.Context) error {
// TODO: Open WP → Durance of Hate 2 → teleport to level 3 entrance
return nil
}
func (r *Routine) findMephisto(ctx context.Context) error {
// TODO: Teleport around Durance 3 to find Mephisto
// Position for moat trick (safe spot across the moat)
return nil
}
func (r *Routine) killMephisto(ctx context.Context) error {
// TODO: Cast offensive spells from moat trick position
// Monitor health, use potions if needed
return nil
}
func (r *Routine) lootItems(ctx context.Context) error {
// TODO: Teleport to body, detect items, pick up per loot filter
return nil
}
func (r *Routine) townPortal(ctx context.Context) error {
// TODO: Cast TP, click portal to go to town
return nil
}
func (r *Routine) stashItems(ctx context.Context) error {
// TODO: If inventory has items worth stashing, open stash and transfer
return nil
}
func (r *Routine) exitGame(ctx context.Context) error {
// TODO: ESC → Save & Exit
return nil
}

View file

@ -1,33 +0,0 @@
# Core dependencies
opencv-python>=4.8.0
pytesseract>=0.3.10
mss>=9.0.1
pyautogui>=0.9.54
pynput>=1.7.6
Pillow>=10.0.0
numpy>=1.24.0
pyyaml>=6.0
# Web dashboard
fastapi>=0.103.0
uvicorn>=0.23.0
jinja2>=3.1.0
# Pathfinding
networkx>=3.1
# Utilities
python-dotenv>=1.0.0
colorlog>=6.7.0
psutil>=5.9.0
# Development
pytest>=7.4.0
pytest-cov>=4.1.0
black>=23.0.0
mypy>=1.5.0
flake8>=6.0.0
# Optional: Advanced computer vision
# scikit-image>=0.21.0
# scipy>=1.11.0

View file

@ -1,57 +0,0 @@
"""Setup script for ISO Bot - Isometric Game Bot Engine."""
from setuptools import setup, find_packages
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
with open("requirements.txt", "r", encoding="utf-8") as fh:
requirements = [line.strip() for line in fh if line.strip() and not line.startswith("#")]
setup(
name="iso-bot",
version="0.1.0",
author="Hoid",
author_email="hoid@cloonar.com",
description="Screen-reading bot engine for isometric games, starting with Diablo II: Resurrected",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://git.cloonar.com/openclawd/iso-bot",
packages=find_packages(),
classifiers=[
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Games/Entertainment",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Scientific/Engineering :: Image Recognition",
],
python_requires=">=3.11",
install_requires=requirements,
extras_require={
"dev": [
"pytest>=7.4.0",
"pytest-cov>=4.1.0",
"black>=23.0.0",
"mypy>=1.5.0",
"flake8>=6.0.0",
],
"advanced": [
"scikit-image>=0.21.0",
"scipy>=1.11.0",
],
},
entry_points={
"console_scripts": [
"iso-bot=games.d2r.game:main",
],
},
include_package_data=True,
package_data={
"games.d2r": ["templates/*", "config/*"],
},
)

View file

View file

View file

@ -1,19 +0,0 @@
"""Web dashboard for bot monitoring and control.
Provides a real-time web UI showing bot status, run statistics,
and configuration options.
"""
import logging
from typing import Optional
logger = logging.getLogger(__name__)
# TODO: FastAPI-based dashboard
# - Real-time bot status (running/paused/stopped)
# - Current routine and phase
# - Run statistics (count, items found, runtime)
# - Health/mana display
# - Start/stop/pause controls
# - Configuration editor
# - Log viewer