Add homeassistant skill in unpacked format
- Add complete homeassistant skill source to skills/ directory - Includes all scripts, references, and automation templates - Matches format of other skills in repository
This commit is contained in:
172
skills/homeassistant/scripts/api_client.py
Executable file
172
skills/homeassistant/scripts/api_client.py
Executable file
@@ -0,0 +1,172 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Home Assistant REST API Client
|
||||
|
||||
This script provides utilities to interact with Home Assistant via the REST API.
|
||||
Supports getting states, calling services, and querying configuration.
|
||||
|
||||
Usage:
|
||||
# Get all entity states
|
||||
python3 api_client.py --url http://homeassistant.local:8123 --token YOUR_TOKEN get-states
|
||||
|
||||
# Get specific entity state
|
||||
python3 api_client.py --url http://homeassistant.local:8123 --token YOUR_TOKEN get-state light.living_room
|
||||
|
||||
# Call a service
|
||||
python3 api_client.py --url http://homeassistant.local:8123 --token YOUR_TOKEN call-service light turn_on --entity light.living_room --data '{"brightness": 255}'
|
||||
|
||||
# List all services
|
||||
python3 api_client.py --url http://homeassistant.local:8123 --token YOUR_TOKEN list-services
|
||||
|
||||
Environment variables:
|
||||
HA_URL: Home Assistant URL (e.g., http://homeassistant.local:8123)
|
||||
HA_TOKEN: Long-lived access token
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from typing import Any, Dict, Optional
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
|
||||
|
||||
class HomeAssistantClient:
|
||||
"""Client for interacting with Home Assistant REST API."""
|
||||
|
||||
def __init__(self, url: str, token: str):
|
||||
"""Initialize the client with URL and access token."""
|
||||
self.url = url.rstrip('/')
|
||||
self.token = token
|
||||
self.headers = {
|
||||
'Authorization': f'Bearer {token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
def _request(self, endpoint: str, method: str = 'GET', data: Optional[Dict] = None) -> Any:
|
||||
"""Make a request to the Home Assistant API."""
|
||||
url = f"{self.url}/api/{endpoint}"
|
||||
|
||||
req = urllib.request.Request(url, headers=self.headers, method=method)
|
||||
|
||||
if data is not None:
|
||||
req.data = json.dumps(data).encode('utf-8')
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req) as response:
|
||||
return json.loads(response.read().decode('utf-8'))
|
||||
except urllib.error.HTTPError as e:
|
||||
error_body = e.read().decode('utf-8')
|
||||
print(f"HTTP Error {e.code}: {e.reason}", file=sys.stderr)
|
||||
print(f"Response: {error_body}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except urllib.error.URLError as e:
|
||||
print(f"URL Error: {e.reason}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
def get_states(self) -> list:
|
||||
"""Get all entity states."""
|
||||
return self._request('states')
|
||||
|
||||
def get_state(self, entity_id: str) -> Dict:
|
||||
"""Get state of a specific entity."""
|
||||
return self._request(f'states/{entity_id}')
|
||||
|
||||
def get_services(self) -> Dict:
|
||||
"""Get all available services."""
|
||||
return self._request('services')
|
||||
|
||||
def get_config(self) -> Dict:
|
||||
"""Get Home Assistant configuration."""
|
||||
return self._request('config')
|
||||
|
||||
def call_service(self, domain: str, service: str, entity_id: Optional[str] = None,
|
||||
service_data: Optional[Dict] = None) -> list:
|
||||
"""Call a service."""
|
||||
data = {}
|
||||
if entity_id:
|
||||
data['entity_id'] = entity_id
|
||||
if service_data:
|
||||
data.update(service_data)
|
||||
|
||||
return self._request(f'services/{domain}/{service}', method='POST', data=data if data else None)
|
||||
|
||||
def get_error_log(self) -> str:
|
||||
"""Get the error log."""
|
||||
url = f"{self.url}/api/error_log"
|
||||
req = urllib.request.Request(url, headers=self.headers)
|
||||
with urllib.request.urlopen(req) as response:
|
||||
return response.read().decode('utf-8')
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Home Assistant REST API Client')
|
||||
parser.add_argument('--url', help='Home Assistant URL',
|
||||
default=os.getenv('HA_URL'))
|
||||
parser.add_argument('--token', help='Long-lived access token',
|
||||
default=os.getenv('HA_TOKEN'))
|
||||
|
||||
subparsers = parser.add_subparsers(dest='command', help='Command to execute')
|
||||
|
||||
# get-states command
|
||||
subparsers.add_parser('get-states', help='Get all entity states')
|
||||
|
||||
# get-state command
|
||||
state_parser = subparsers.add_parser('get-state', help='Get specific entity state')
|
||||
state_parser.add_argument('entity_id', help='Entity ID')
|
||||
|
||||
# list-services command
|
||||
subparsers.add_parser('list-services', help='List all available services')
|
||||
|
||||
# get-config command
|
||||
subparsers.add_parser('get-config', help='Get Home Assistant configuration')
|
||||
|
||||
# call-service command
|
||||
service_parser = subparsers.add_parser('call-service', help='Call a service')
|
||||
service_parser.add_argument('domain', help='Service domain (e.g., light, switch)')
|
||||
service_parser.add_argument('service', help='Service name (e.g., turn_on, turn_off)')
|
||||
service_parser.add_argument('--entity', help='Entity ID to target')
|
||||
service_parser.add_argument('--data', help='Service data as JSON string')
|
||||
|
||||
# get-error-log command
|
||||
subparsers.add_parser('get-error-log', help='Get the error log')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.url:
|
||||
print("Error: Home Assistant URL is required (use --url or set HA_URL environment variable)", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not args.token:
|
||||
print("Error: Access token is required (use --token or set HA_TOKEN environment variable)", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not args.command:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
client = HomeAssistantClient(args.url, args.token)
|
||||
|
||||
if args.command == 'get-states':
|
||||
result = client.get_states()
|
||||
elif args.command == 'get-state':
|
||||
result = client.get_state(args.entity_id)
|
||||
elif args.command == 'list-services':
|
||||
result = client.get_services()
|
||||
elif args.command == 'get-config':
|
||||
result = client.get_config()
|
||||
elif args.command == 'call-service':
|
||||
service_data = json.loads(args.data) if args.data else None
|
||||
result = client.call_service(args.domain, args.service, args.entity, service_data)
|
||||
elif args.command == 'get-error-log':
|
||||
result = client.get_error_log()
|
||||
print(result)
|
||||
return
|
||||
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
88
skills/homeassistant/scripts/list_entities.py
Executable file
88
skills/homeassistant/scripts/list_entities.py
Executable file
@@ -0,0 +1,88 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
List all Home Assistant entities and their states
|
||||
|
||||
This script queries Home Assistant and displays all entities with their current states.
|
||||
Can filter by domain (e.g., light, switch, sensor) and format output.
|
||||
|
||||
Usage:
|
||||
# List all entities
|
||||
python3 list_entities.py --url http://homeassistant.local:8123 --token YOUR_TOKEN
|
||||
|
||||
# List only lights
|
||||
python3 list_entities.py --url http://homeassistant.local:8123 --token YOUR_TOKEN --domain light
|
||||
|
||||
# Output as JSON
|
||||
python3 list_entities.py --url http://homeassistant.local:8123 --token YOUR_TOKEN --format json
|
||||
|
||||
Environment variables:
|
||||
HA_URL: Home Assistant URL
|
||||
HA_TOKEN: Long-lived access token
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
|
||||
|
||||
def get_entities(url: str, token: str):
|
||||
"""Get all entity states from Home Assistant."""
|
||||
api_url = f"{url.rstrip('/')}/api/states"
|
||||
headers = {
|
||||
'Authorization': f'Bearer {token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
req = urllib.request.Request(api_url, headers=headers)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req) as response:
|
||||
return json.loads(response.read().decode('utf-8'))
|
||||
except Exception as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='List Home Assistant entities')
|
||||
parser.add_argument('--url', help='Home Assistant URL',
|
||||
default=os.getenv('HA_URL'))
|
||||
parser.add_argument('--token', help='Long-lived access token',
|
||||
default=os.getenv('HA_TOKEN'))
|
||||
parser.add_argument('--domain', help='Filter by domain (e.g., light, switch, sensor)')
|
||||
parser.add_argument('--format', choices=['table', 'json', 'list'],
|
||||
default='table', help='Output format')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.url or not args.token:
|
||||
print("Error: URL and token are required", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
entities = get_entities(args.url, args.token)
|
||||
|
||||
# Filter by domain if specified
|
||||
if args.domain:
|
||||
entities = [e for e in entities if e['entity_id'].startswith(f"{args.domain}.")]
|
||||
|
||||
if args.format == 'json':
|
||||
print(json.dumps(entities, indent=2))
|
||||
elif args.format == 'list':
|
||||
for entity in entities:
|
||||
print(entity['entity_id'])
|
||||
else: # table format
|
||||
print(f"{'Entity ID':<40} {'State':<20} {'Last Changed':<20}")
|
||||
print("-" * 80)
|
||||
for entity in entities:
|
||||
entity_id = entity['entity_id']
|
||||
state = entity['state']
|
||||
last_changed = entity.get('last_changed', 'N/A')[:19] # Truncate timestamp
|
||||
print(f"{entity_id:<40} {state:<20} {last_changed:<20}")
|
||||
|
||||
print(f"\nTotal: {len(entities)} entities")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
95
skills/homeassistant/scripts/list_services.py
Executable file
95
skills/homeassistant/scripts/list_services.py
Executable file
@@ -0,0 +1,95 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
List all available Home Assistant services
|
||||
|
||||
This script queries Home Assistant and displays all available services organized by domain.
|
||||
|
||||
Usage:
|
||||
# List all services
|
||||
python3 list_services.py --url http://homeassistant.local:8123 --token YOUR_TOKEN
|
||||
|
||||
# List services for specific domain
|
||||
python3 list_services.py --url http://homeassistant.local:8123 --token YOUR_TOKEN --domain light
|
||||
|
||||
# Output as JSON
|
||||
python3 list_services.py --url http://homeassistant.local:8123 --token YOUR_TOKEN --format json
|
||||
|
||||
Environment variables:
|
||||
HA_URL: Home Assistant URL
|
||||
HA_TOKEN: Long-lived access token
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
|
||||
|
||||
def get_services(url: str, token: str):
|
||||
"""Get all services from Home Assistant."""
|
||||
api_url = f"{url.rstrip('/')}/api/services"
|
||||
headers = {
|
||||
'Authorization': f'Bearer {token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
req = urllib.request.Request(api_url, headers=headers)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req) as response:
|
||||
return json.loads(response.read().decode('utf-8'))
|
||||
except Exception as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='List Home Assistant services')
|
||||
parser.add_argument('--url', help='Home Assistant URL',
|
||||
default=os.getenv('HA_URL'))
|
||||
parser.add_argument('--token', help='Long-lived access token',
|
||||
default=os.getenv('HA_TOKEN'))
|
||||
parser.add_argument('--domain', help='Filter by domain (e.g., light, switch)')
|
||||
parser.add_argument('--format', choices=['tree', 'json', 'list'],
|
||||
default='tree', help='Output format')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.url or not args.token:
|
||||
print("Error: URL and token are required", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
services = get_services(args.url, args.token)
|
||||
|
||||
# Filter by domain if specified
|
||||
if args.domain:
|
||||
services = {args.domain: services.get(args.domain, {})}
|
||||
|
||||
if args.format == 'json':
|
||||
print(json.dumps(services, indent=2))
|
||||
elif args.format == 'list':
|
||||
for domain, domain_services in sorted(services.items()):
|
||||
for service in sorted(domain_services.keys()):
|
||||
print(f"{domain}.{service}")
|
||||
else: # tree format
|
||||
for domain, domain_services in sorted(services.items()):
|
||||
print(f"\n{domain}:")
|
||||
for service, details in sorted(domain_services.items()):
|
||||
description = details.get('description', 'No description')
|
||||
print(f" • {service}")
|
||||
if description and description != 'No description':
|
||||
print(f" {description}")
|
||||
|
||||
# Show fields if available
|
||||
if 'fields' in details and details['fields']:
|
||||
print(f" Fields:")
|
||||
for field_name, field_info in details['fields'].items():
|
||||
field_desc = field_info.get('description', '')
|
||||
print(f" - {field_name}: {field_desc}")
|
||||
|
||||
print(f"\nTotal: {sum(len(s) for s in services.values())} services across {len(services)} domains")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
186
skills/homeassistant/scripts/validate_yaml.py
Executable file
186
skills/homeassistant/scripts/validate_yaml.py
Executable file
@@ -0,0 +1,186 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Home Assistant YAML Configuration Validator
|
||||
|
||||
This script validates Home Assistant YAML configuration files for syntax errors
|
||||
and common issues.
|
||||
|
||||
Usage:
|
||||
# Validate a single file
|
||||
python3 validate_yaml.py configuration.yaml
|
||||
|
||||
# Validate automation configuration
|
||||
python3 validate_yaml.py automations.yaml --type automation
|
||||
|
||||
# Validate with verbose output
|
||||
python3 validate_yaml.py configuration.yaml --verbose
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
class HomeAssistantYAMLValidator:
|
||||
"""Validator for Home Assistant YAML files."""
|
||||
|
||||
# Common required fields for different config types
|
||||
AUTOMATION_REQUIRED = ['trigger']
|
||||
AUTOMATION_OPTIONAL = ['alias', 'id', 'description', 'condition', 'action', 'mode', 'variables']
|
||||
|
||||
def __init__(self, verbose: bool = False):
|
||||
self.verbose = verbose
|
||||
self.errors: List[str] = []
|
||||
self.warnings: List[str] = []
|
||||
|
||||
def validate_file(self, filepath: Path) -> bool:
|
||||
"""Validate a YAML file."""
|
||||
self.errors = []
|
||||
self.warnings = []
|
||||
|
||||
if not filepath.exists():
|
||||
self.errors.append(f"File not found: {filepath}")
|
||||
return False
|
||||
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
# Check for tabs (Home Assistant doesn't allow tabs)
|
||||
if '\t' in content:
|
||||
self.errors.append("File contains tab characters. Use spaces for indentation.")
|
||||
return False
|
||||
|
||||
# Parse YAML
|
||||
data = yaml.safe_load(content)
|
||||
|
||||
if self.verbose:
|
||||
print(f"✓ YAML syntax is valid")
|
||||
|
||||
return True
|
||||
|
||||
except yaml.YAMLError as e:
|
||||
self.errors.append(f"YAML syntax error: {str(e)}")
|
||||
return False
|
||||
except Exception as e:
|
||||
self.errors.append(f"Error reading file: {str(e)}")
|
||||
return False
|
||||
|
||||
def validate_automation(self, filepath: Path) -> bool:
|
||||
"""Validate an automation configuration file."""
|
||||
if not self.validate_file(filepath):
|
||||
return False
|
||||
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8') as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
if not isinstance(data, list):
|
||||
# Single automation wrapped in automation: key
|
||||
if isinstance(data, dict):
|
||||
if 'automation' in data:
|
||||
automations = data['automation']
|
||||
if not isinstance(automations, list):
|
||||
automations = [automations]
|
||||
else:
|
||||
automations = [data]
|
||||
else:
|
||||
self.errors.append("Automation file must contain a list or dict")
|
||||
return False
|
||||
else:
|
||||
automations = data
|
||||
|
||||
# Validate each automation
|
||||
for i, automation in enumerate(automations):
|
||||
if not isinstance(automation, dict):
|
||||
self.errors.append(f"Automation {i+1} is not a dictionary")
|
||||
continue
|
||||
|
||||
# Check required fields
|
||||
if 'trigger' not in automation:
|
||||
self.errors.append(f"Automation {i+1}: Missing required field 'trigger'")
|
||||
|
||||
# Check for common issues
|
||||
if 'alias' not in automation:
|
||||
self.warnings.append(f"Automation {i+1}: No 'alias' field (recommended for readability)")
|
||||
|
||||
if 'id' not in automation:
|
||||
self.warnings.append(f"Automation {i+1}: No 'id' field (required for UI editor)")
|
||||
|
||||
# Validate trigger structure
|
||||
if 'trigger' in automation:
|
||||
triggers = automation['trigger']
|
||||
if not isinstance(triggers, list):
|
||||
triggers = [triggers]
|
||||
|
||||
for j, trigger in enumerate(triggers):
|
||||
if not isinstance(trigger, dict):
|
||||
self.errors.append(f"Automation {i+1}, Trigger {j+1}: Must be a dictionary")
|
||||
continue
|
||||
if 'trigger' not in trigger and 'platform' not in trigger:
|
||||
self.errors.append(f"Automation {i+1}, Trigger {j+1}: Missing 'trigger' or 'platform' field")
|
||||
|
||||
# Validate action structure if present
|
||||
if 'action' in automation:
|
||||
actions = automation['action']
|
||||
if not isinstance(actions, list):
|
||||
actions = [actions]
|
||||
|
||||
for j, action in enumerate(actions):
|
||||
if not isinstance(action, dict):
|
||||
self.errors.append(f"Automation {i+1}, Action {j+1}: Must be a dictionary")
|
||||
|
||||
if self.verbose and not self.errors:
|
||||
print(f"✓ Validated {len(automations)} automation(s)")
|
||||
|
||||
return len(self.errors) == 0
|
||||
|
||||
except Exception as e:
|
||||
self.errors.append(f"Error validating automation: {str(e)}")
|
||||
return False
|
||||
|
||||
def print_results(self):
|
||||
"""Print validation results."""
|
||||
if self.errors:
|
||||
print("\n❌ Errors found:")
|
||||
for error in self.errors:
|
||||
print(f" • {error}")
|
||||
|
||||
if self.warnings:
|
||||
print("\n⚠️ Warnings:")
|
||||
for warning in self.warnings:
|
||||
print(f" • {warning}")
|
||||
|
||||
if not self.errors and not self.warnings:
|
||||
print("✅ Validation passed with no errors or warnings")
|
||||
elif not self.errors:
|
||||
print("\n✅ Validation passed (with warnings)")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Validate Home Assistant YAML files')
|
||||
parser.add_argument('file', help='YAML file to validate')
|
||||
parser.add_argument('--type', choices=['automation', 'general'],
|
||||
default='general', help='Type of configuration to validate')
|
||||
parser.add_argument('--verbose', '-v', action='store_true',
|
||||
help='Verbose output')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
filepath = Path(args.file)
|
||||
validator = HomeAssistantYAMLValidator(verbose=args.verbose)
|
||||
|
||||
if args.type == 'automation':
|
||||
success = validator.validate_automation(filepath)
|
||||
else:
|
||||
success = validator.validate_file(filepath)
|
||||
|
||||
validator.print_results()
|
||||
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user