import requests
import os
from dotenv import load_dotenv
import json
from typing import Dict, Any
import time
# Load environment variables
load_dotenv()
class ChangeDetectionAPI:
def __init__(self):
self.base_url = os.getenv('CHANGEDETECTION_BASE_URL', 'https://lemonade.changedetection.io/XXXXXXXX/api')
self.api_key = os.getenv('CHANGEDETECTION_API_KEY')
if not self.api_key:
raise ValueError("API key not found in environment variables")
print(f"Using base URL: {self.base_url}")
print(f"Using API key: {self.api_key[:4]}...{self.api_key[-4:]}")
self.headers = {
"x-api-key": self.api_key,
"Content-Type": "application/json"
}
def create_watcher(self, url: str, tag: str, title: str = None) -> Dict[str, Any]:
"""Create a new watcher with retry mechanism"""
endpoint = f"{self.base_url}/v1/watch" # Use v1 endpoint as per example
print(f"Creating watcher at endpoint: {endpoint}")
# Simplified payload based on example
payload = {
"url": url,
"tag": tag
}
# Add title if provided
if title:
payload["title"] = title
print(f"Request payload: {json.dumps(payload, indent=2)}")
print(f"Request headers: {json.dumps(self.headers, indent=2)}")
max_retries = 3
retry_delay = 5 # seconds
for attempt in range(max_retries):
try:
response = requests.post(endpoint, json=payload, headers=self.headers)
print(f"Response status: {response.status_code}")
print(f"Response headers: {json.dumps(dict(response.headers), indent=2)}")
if response.text:
print(f"Response body: {response.text}")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise Exception(f"Failed to create watcher after {max_retries} attempts: {str(e)}")
print(f"Attempt {attempt + 1} failed, retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
def get_watcher_status(self, watcher_id: str) -> Dict[str, Any]:
"""Get the status of an existing watcher"""
endpoint = f"{self.base_url}/v1/watch/{watcher_id}"
response = requests.get(endpoint, headers=self.headers)
response.raise_for_status()
return response.json()
def delete_watcher(self, watcher_id: str) -> bool:
"""Delete an existing watcher"""
endpoint = f"{self.base_url}/v1/watch/{watcher_id}"
response = requests.delete(endpoint, headers=self.headers)
return response.status_code == 200
def main():
try:
# Initialize API client
api = ChangeDetectionAPI()
# Example usage
domain_url = "https://www.google.com"
# Create watcher with minimal required fields
result = api.create_watcher(
url=domain_url,
tag="stock",
title="Title"
)
print("Watcher created successfully:", json.dumps(result, indent=2))
# Get watcher status
if 'uuid' in result:
status = api.get_watcher_status(result['uuid'])
print("Watcher status:", json.dumps(status, indent=2))
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
main()