A Python framework that enables secure hardware control through the Model Context Protocol, allowing AI agents and automation systems to interact with physical devices across multiple platforms.
The MCP Hardware Access Library is a comprehensive Python framework that enables secure hardware control through the Model Context Protocol (MCP). It provides AI agents and automation systems with the ability to interact with physical devices across multiple platforms.
We now use Conda for dependency management to ensure consistent environments across different platforms:
To set up your environment:
# Linux/macOS chmod +x setup_environment.sh ./setup_environment.sh # Windows setup_environment.bat
For more details, see INSTALLATION.md.
We've added convenient runner scripts that automatically detect if the UnitMCP package is installed and install it if needed:
# Linux/macOS ./run.sh [options] # Windows run.bat [options]
These scripts will:
This is especially useful for:
All command-line options are passed through to the UnitMCP orchestrator, so you can use them exactly as you would with the standard command:
./run.sh --verbose --simulation true
For more details on installation and running options, see INSTALLATION.md.
The project has been reorganized to improve clarity and reduce duplication:
docs/
directory with clear sectionsconfigs/
directorysrc/unitmcp/
See the Migration Guide for details on the changes and how to update your code.
Client System
MCPHardwareClient
: Async client for hardware controlMCPShell
: Interactive command-line interfaceServer System
MCPServer
: Main server frameworkDeviceManager
: Hardware device managementSecurityManager
: Authentication and authorizationHardware Abstraction
Remote Control
Domain-Specific Language
Security Layer
Pipeline System
DSL System
Claude UnitMCP Plugin
graph TD A[AI Agent / User] --> B[MCPHardwareClient] B --> C[MCP Protocol] C --> D[MCPServer] D --> E1[GPIO Server] D --> E2[Input Server] D --> E3[Audio Server] D --> E4[Camera Server] E1 --> F1[Hardware Drivers] E2 --> F1 E3 --> F1 E4 --> F1 F1 --> G[Physical Hardware] H[Hardware Abstraction Layer] --> B H --> I1[LED Device] H --> I2[Button Device] H --> I3[Traffic Light Device] H --> I4[Display Device] I1 --> J[Device Factory] I2 --> J I3 --> J I4 --> J J --> F1
For more detailed architecture documentation, see the Architecture Documentation.
The project has been reorganized with the following structure:
UnitMCP/
├── configs/ # Configuration files
│ ├── env/ # Environment variables
│ └── yaml/ # YAML configuration files
│ ├── devices/ # Device configurations
│ ├── automation/ # Automation configurations
│ └── security/ # Security configurations
├── docs/ # Documentation
│ ├── api/ # API documentation
│ ├── architecture/ # Architecture documentation
│ │ ├── diagrams/ # Architecture diagrams
│ │ └── descriptions/ # Component descriptions
│ ├── guides/ # User guides
│ │ ├── installation/ # Installation guides
│ │ ├── hardware/ # Hardware guides
│ │ └── llm/ # LLM integration guides
│ ├── examples/ # Example documentation
│ └── development/ # Development documentation
├── docker/ # Docker configurations
├── examples/ # Example code
│ ├── basic/ # Basic examples
│ ├── platforms/ # Platform-specific examples
│ ├── llm/ # LLM integration examples
│ └── advanced/ # Advanced examples
├── src/ # Source code
│ └── unitmcp/ # UnitMCP package
│ ├── core/ # Core functionality
│ ├── hardware/ # Hardware abstraction layer
│ ├── communication/ # Communication protocols
│ ├── dsl/ # Domain-specific language
│ ├── llm/ # LLM integration
│ ├── plugin/ # Plugin system
│ ├── security/ # Security features
│ ├── utils/ # Utility functions
│ └── simulation/ # Simulation components
└── tests/ # Tests
├── unit/ # Unit tests
├── integration/ # Integration tests
├── system/ # System tests
└── performance/ # Performance tests
UnitMCP provides powerful capabilities for remote device control and real-time GPIO streaming:
The shell_cli
module provides an interactive shell interface for controlling remote devices:
# Start the interactive shell cd examples/shell_cli python shell_cli_demo.py --interactive # In the shell mcp> connect 192.168.1.100 8888 mcp> gpio_setup 17 OUT mcp> led_setup led1 17 mcp> led led1 on
For simpler implementations, a lightweight shell is also available:
# Connect via TCP python simple_remote_shell.py --host 192.168.1.100 --port 8888 # Connect via SSH (requires paramiko) python simple_remote_shell.py --host 192.168.1.100 --port 22 --ssh
The rpi_control
module enables real-time streaming of GPIO pin states from a Raspberry Pi to a client PC:
# On the Raspberry Pi cd examples/rpi_control python server.py --stream-gpio # On the client PC cd examples/rpi_control python client.py --host --monitor-gpio
This provides:
See the examples/rpi_control/README.md for detailed documentation.
mcp> led_setup led1 17 mcp> led led1 on mcp> type "Hello from MCP!" mcp> pipeline_create automation mcp> pipeline_run automation
steps = [ PipelineStep("setup", "gpio.setupLED", {"pin": 17}), PipelineStep("blink", "gpio.controlLED", {"action": "blink"}), PipelineStep("wait", "system.sleep", {"duration": 5}) ] pipeline = Pipeline("demo", steps) await pipeline.execute(client)
mcp-hardware/
├── audio/ # Audio-related scripts
├── build/ # Build-related files
├── hardware/ # Hardware configuration scripts
├── install/ # Installation scripts and utilities
├── misc/ # Miscellaneous utilities
├── nlp/ # Natural Language Processing scripts
├── python/ # Python-related utilities
├── rpi/ # Raspberry Pi specific scripts
├── service/ # Service setup scripts
├── ssh/ # SSH connection utilities
├── test/ # Testing utilities
├── update/ # Update and upgrade scripts
├── src/unitmcp/ # Main package
│ ├── client/ # Client implementations
│ ├── server/ # Hardware servers
│ ├── pipeline/ # Pipeline system
│ ├── protocols/ # MCP protocol
│ ├── security/ # Permission system
│ ├── hardware/ # Hardware abstraction layer
│ │ ├── base.py # Base device classes and interfaces
│ │ ├── led.py # LED device implementation
│ │ ├── button.py # Button device implementation
│ │ ├── traffic_light.py # Traffic light device implementation
│ │ ├── display.py # Display device implementation
│ │ └── device_factory.py # Device factory implementation
│ └── utils/ # Utilities
├── examples/ # Usage examples
│ ├── Basic Controls # LED, keyboard, mouse
│ ├── Automation # Pipelines, scripts
│ ├── AI Integration # Ollama, voice
│ ├── Hardware Abstraction # Device abstraction examples
│ └── Complete Systems # Traffic light, security
└── tests/ # Test suite
git clone https://github.com/example/mcp-hardware.git cd mcp-hardware pip install -e .
# Set up hardware components on a local Raspberry Pi python rpi_control/setup/setup_all.py --component oled # Set up hardware components on a remote Raspberry Pi python rpi_control/setup/remote_setup.py --host raspberrypi.local --component oled # Run setup in simulation mode (no physical hardware or sudo required) python rpi_control/setup/remote_setup.py --host raspberrypi.local --component oled --simulation
python examples/start_server.py
# Basic LED control python examples/led_control.py # Interactive shell python -m unitmcp.client.shell # Pipeline automation python examples/pipeline_demo.py # Hardware abstraction layer demo python examples/hardware_example.py
# Create and use LED device from unitmcp.hardware.led import LEDDevice from unitmcp.hardware.base import DeviceMode # Create an LED device in simulation mode led = LEDDevice(device_id="my_led", pin=17, mode=DeviceMode.SIMULATION) # Initialize the device await led.initialize() # Control the LED await led.activate() # Turn on await led.deactivate() # Turn off await led.blink(on_time=0.5, off_time=0.5, count=5) # Blink 5 times # Using the device factory from unitmcp.hardware.device_factory import create_device from unitmcp.hardware.base import DeviceType # Create a device using the factory button = await create_device( factory_type="simulation", device_id="my_button", device_type=DeviceType.BUTTON, pin=27 ) # Clean up when done await led.cleanup() await button.cleanup()
AI Agent MCP Client MCP Servers Hardware Drivers
This example demonstrates controlling an LED using the hardware abstraction layer:
import asyncio from unitmcp.hardware.led import LEDDevice from unitmcp.hardware.base import DeviceMode async def led_example(): # Create an LED device in simulation mode led = LEDDevice(device_id="example_led", pin=17, mode=DeviceMode.SIMULATION) # Initialize the device await led.initialize() # Basic control print("Turning LED on") await led.activate() await asyncio.sleep(1) print("Turning LED off") await led.deactivate() await asyncio.sleep(1) # Brightness control print("Setting LED brightness to 50%") await led.set_brightness(0.5) await asyncio.sleep(1) # Blinking print("Blinking LED (3 times)") await led.blink(count=3) # Cleanup await led.cleanup() if __name__ == "__main__": asyncio.run(led_example())
This example shows how to use a button with event callbacks:
import asyncio from unitmcp.hardware.button import ButtonDevice from unitmcp.hardware.base import DeviceMode async def button_example(): # Create a button device in simulation mode button = ButtonDevice(device_id="example_button", pin=27, mode=DeviceMode.SIMULATION) # Initialize the device await button.initialize() # Register event callbacks def on_pressed(event, data): print(f"Button pressed! Event: {event}, Data: {data}") def on_released(event, data): print(f"Button released! Event: {event}, Data: {data}") print(f"Press duration: {data.get('duration', 0):.2f} seconds") button.register_event_callback("pressed", on_pressed) button.register_event_callback("released", on_released) # Simulate button presses print("Simulating button press (short)") await button.simulate_press(duration=0.2) await asyncio.sleep(0.5) print("Simulating button press (long)") await button.simulate_press(duration=1.0) # Cleanup await button.cleanup() if __name__ == "__main__": asyncio.run(button_example())
This example demonstrates controlling a traffic light sequence:
import asyncio from unitmcp.hardware.traffic_light import TrafficLightDevice, TrafficLightState from unitmcp.hardware.base import DeviceMode async def traffic_light_example(): # Create a traffic light device in simulation mode traffic_light = TrafficLightDevice( device_id="example_traffic_light", red_pin=17, yellow_pin=18, green_pin=27, mode=DeviceMode.SIMULATION ) # Initialize the device await traffic_light.initialize() # Register state change callback def on_state_changed(event, data): print(f"Traffic light state changed to: {data.get('state')}") traffic_light.register_event_callback("state_changed", on_state_changed) # Manual state control print("Setting traffic light to RED") await traffic_light.set_state(TrafficLightState.RED) await asyncio.sleep(2) print("Setting traffic light to YELLOW") await traffic_light.set_state(TrafficLightState.YELLOW) await asyncio.sleep(2) print("Setting traffic light to GREEN") await traffic_light.set_state(TrafficLightState.GREEN) await asyncio.sleep(2) # Start automatic cycling print("Starting automatic cycle") await traffic_light.start_cycle() # Let it cycle for a while await asyncio.sleep(10) # Stop cycling print("Stopping cycle") await traffic_light.stop_cycle() # Turn off all lights print("Turning off all lights") await traffic_light.set_state(TrafficLightState.OFF) # Cleanup await traffic_light.cleanup() if __name__ == "__main__": asyncio.run(traffic_light_example())
This example shows how to use a display device:
import asyncio from unitmcp.hardware.display import DisplayDevice, DisplayType from unitmcp.hardware.base import DeviceMode async def display_example(): # Create a display device in simulation mode display = DisplayDevice( device_id="example_display", display_type=DisplayType.LCD, width=16, height=2, mode=DeviceMode.SIMULATION ) # Initialize the device await display.initialize() # Write text to display print("Writing text to display") await display.clear() await display.write_line("Hello, UnitMCP!", line=0) await display.write_line("Hardware Demo", line=1) await asyncio.sleep(2) # Update content with a counter print("Updating display content") await display.clear() await display.write_text("Count: ", position=(0, 0)) for i in range(5): await display.set_cursor(position=(7, 0)) await display.write_text(str(i)) await asyncio.sleep(0.5) # Toggle backlight print("Toggling backlight") await display.set_backlight(False) await asyncio.sleep(1) await display.set_backlight(True) # Cleanup await display.cleanup() if __name__ == "__main__": asyncio.run(display_example())
This example demonstrates using the device factory to create devices:
import asyncio from unitmcp.hardware.device_factory import create_device, create_devices_from_config from unitmcp.hardware.base import DeviceType, DeviceMode async def factory_example(): # Create devices directly using the factory print("Creating devices using factory") led = await create_device( factory_type="simulation", device_id="factory_led", device_type=DeviceType.LED, pin=17 ) button = await create_device( factory_type="simulation", device_id="factory_button", device_type=DeviceType.BUTTON, pin=27 ) # Use the created devices print("Using factory-created devices") await led.activate() await asyncio.sleep(1) await led.deactivate() # Create devices from configuration print("Creating devices from configuration") config = { "devices": { "config_led": { "type": "led", "pin": 22 }, "config_button": { "type": "button", "pin": 23 }, "config_traffic_light": { "type": "traffic_light", "red_pin": 24, "yellow_pin": 25, "green_pin": 26 } } } devices = await create_devices_from_config( config=config, factory_type="simulation" ) # Use a device from the configuration traffic_light = devices["config_traffic_light"] await traffic_light.set_state("GREEN") await asyncio.sleep(1) await traffic_light.set_state("OFF") # Cleanup all devices for device in [led, button, *devices.values()]: await device.cleanup() if __name__ == "__main__": asyncio.run(factory_example())
This example demonstrates a complete interactive system that combines multiple hardware devices to create a traffic light controller with button input and display feedback:
import asyncio import logging import yaml from unitmcp.hardware.device_factory import create_devices_from_config from unitmcp.hardware.traffic_light import TrafficLightState from unitmcp.utils.env_loader import EnvLoader # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Sample configuration CONFIG = """ devices: traffic_light: type: traffic_light red_pin: 17 yellow_pin: 18 green_pin: 27 pedestrian_button: type: button pin: 22 pull_up: true debounce_ms: 50 status_display: type: display display_type: lcd width: 16 height: 2 i2c_address: 0x27 """ class TrafficLightController: def __init__(self): self.devices = None self.running = False self.pedestrian_waiting = False self.current_state = None self.env = EnvLoader() async def initialize(self): """Initialize all devices from configuration""" # Load configuration config = yaml.safe_load(CONFIG) # Determine mode based on environment factory_type = "hardware" if self.env.get_bool("SIMULATION_MODE", True): factory_type = "simulation" logger.info("Running in SIMULATION mode") else: logger.info("Running in HARDWARE mode") # Create all devices self.devices = await create_devices_from_config( config=config, factory_type=factory_type ) # Get individual devices self.traffic_light = self.devices["traffic_light"] self.button = self.devices["pedestrian_button"] self.display = self.devices["status_display"] # Register button callback self.button.register_event_callback("pressed", self.on_button_pressed) # Register traffic light state change callback self.traffic_light.register_event_callback("state_changed", self.on_state_changed) # Initialize display await self.display.clear() await self.display.write_line("Traffic System", line=0) await self.display.write_line("Initializing...", line=1) logger.info("Traffic light controller initialized") return True async def on_button_pressed(self, event, data): """Handle pedestrian button press""" logger.info(f"Pedestrian button pressed at {data.get('timestamp')}") # Set pedestrian waiting flag self.pedestrian_waiting = True # Update display await self.display.set_cursor(position=(0, 1)) await self.display.write_line("Pedestrian wait", line=1) async def on_state_changed(self, event, data): """Handle traffic light state changes""" state = data.get('state') self.current_state = state logger.info(f"Traffic light changed to {state}") # Update display with current state await self.display.set_cursor(position=(0, 0)) await self.display.write_line(f"State: {state} ", line=0) async def run_traffic_cycle(self): """Run the main traffic light cycle""" self.running = True # Initial state await self.display.clear() await self.display.write_line("State: STARTING", line=0) await self.display.write_line("System ready", line=1) # Start with red await self.traffic_light.set_state(TrafficLightState.RED) await asyncio.sleep(2) while self.running: # Normal cycle: RED -> GREEN -> YELLOW -> RED # GREEN phase await self.traffic_light.set_state(TrafficLightState.GREEN) # Stay green for a while, but check for pedestrian button presses green_time = 0 while green_time >MCPServer: connect() MCPServer->>Client: connection_established Client->>MCPServer: control_device(device_id, command) MCPServer->>DeviceManager: get_device(device_id) DeviceManager->>MCPServer: device MCPServer->>Device: execute_command(command) Device->>MCPServer: result MCPServer->>Client: command_result
flowchart TD A[Start] --> B{Is Hardware Available?} B -->|Yes| C[Initialize Hardware] B -->|No| D[Start Simulation] C --> E[Setup Devices] D --> E E --> F[Start Server] F --> G[Wait for Connections] G --> H{Connection Request?} H -->|Yes| I[Handle Connection] H -->|No| G I --> J[Process Commands] J --> G
classDiagram class Device { +String device_id +DeviceType type +Boolean is_connected +connect() +disconnect() +execute_command() } class LEDDevice { +int pin +Boolean state +turn_on() +turn_off() +blink() } class ButtonDevice { +int pin +Boolean state +read_state() +wait_for_press() } Device Disconnected Disconnected --> Connecting: connect() Connecting --> Connected: success Connecting --> Error: failure Connected --> Processing: receive_command() Processing --> Connected: command_complete Connected --> Disconnecting: disconnect() Error --> Disconnected: reset() Disconnecting --> Disconnected: complete Disconnected --> [*]
erDiagram SERVER ||--o{ DEVICE : manages SERVER { string server_id int port boolean ssl_enabled } DEVICE { string device_id string type boolean is_connected } DEVICE ||--o{ PROPERTY : has PROPERTY { string name string value string data_type } USER ||--o{ SERVER : connects USER { string username string password_hash string role }
gantt title UnitMCP Development Timeline dateFormat YYYY-MM-DD section Planning Requirements Analysis :a1, 2025-01-01, 30d Architecture Design :a2, after a1, 45d section Development Core Framework :d1, after a2, 60d Hardware Abstraction :d2, after a2, 45d Server Implementation :d3, after d1, 30d section Testing Unit Testing :t1, after d2, 20d Integration Testing :t2, after d3, 30d User Acceptance Testing :t3, after t2, 15d section Deployment Documentation :p1, after t3, 15d Release :milestone, after p1, 0d
pie title UnitMCP Component Distribution "Hardware Abstraction" : 30 "Server Framework" : 25 "Client Libraries" : 15 "Documentation" : 10 "Testing" : 15 "Utilities" : 5
gitGraph commit branch develop checkout develop commit commit branch feature/hardware checkout feature/hardware commit commit checkout develop merge feature/hardware branch feature/server checkout feature/server commit checkout develop merge feature/server checkout main merge develop commit
journey title UnitMCP User Journey section Installation Download Package: 5: User Install Dependencies: 3: User Run Setup Script: 4: User section Configuration Edit Config File: 3: User Set Up Devices: 4: User Configure Network: 3: User section Usage Start Server: 5: User Connect Client: 4: User Control Devices: 5: User
These diagrams can be used throughout the documentation to visualize system architecture, processes, and relationships between components.
import mermaid from 'https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.esm.min.mjs'; //import mermaid from 'https://cdn.jsdelivr.net/npm/mermaid@10.8.0/dist/mermaid.min.js'; mermaid.initialize({ startOnReady:true, theme: 'forest', flowchart:{ useMaxWidth:false, htmlLabels:true } }); mermaid.init(undefined, '.language-mermaid');
Discover shared experiences
Shared threads will appear here, showcasing real-world applications and insights from the community. Check back soon for updates!