How to update changedetection API via Python

import requests
import os
from dotenv import load_dotenv
import json
from typing import Dict, Any
import time

# Load environment variables
load_dotenv()

class ChangeDetectionAPI:
    def __init__(self):
        self.base_url = os.getenv('CHANGEDETECTION_BASE_URL', 'https://lemonade.changedetection.io/XXXXXXXX/api')
        self.api_key = os.getenv('CHANGEDETECTION_API_KEY')
        if not self.api_key:
            raise ValueError("API key not found in environment variables")
        
        print(f"Using base URL: {self.base_url}")
        print(f"Using API key: {self.api_key[:4]}...{self.api_key[-4:]}")
        
        self.headers = {
            "x-api-key": self.api_key,
            "Content-Type": "application/json"
        }

    def create_watcher(self, url: str, tag: str, title: str = None) -> Dict[str, Any]:
        """Create a new watcher with retry mechanism"""
        endpoint = f"{self.base_url}/v1/watch"  # Use v1 endpoint as per example
        print(f"Creating watcher at endpoint: {endpoint}")
        
        # Simplified payload based on example
        payload = {
            "url": url,
            "tag": tag
        }
        
        # Add title if provided
        if title:
            payload["title"] = title

        print(f"Request payload: {json.dumps(payload, indent=2)}")
        print(f"Request headers: {json.dumps(self.headers, indent=2)}")

        max_retries = 3
        retry_delay = 5  # seconds

        for attempt in range(max_retries):
            try:
                response = requests.post(endpoint, json=payload, headers=self.headers)
                print(f"Response status: {response.status_code}")
                print(f"Response headers: {json.dumps(dict(response.headers), indent=2)}")
                if response.text:
                    print(f"Response body: {response.text}")
                response.raise_for_status()
                return response.json()
            except requests.exceptions.RequestException as e:
                if attempt == max_retries - 1:
                    raise Exception(f"Failed to create watcher after {max_retries} attempts: {str(e)}")
                print(f"Attempt {attempt + 1} failed, retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)

    def get_watcher_status(self, watcher_id: str) -> Dict[str, Any]:
        """Get the status of an existing watcher"""
        endpoint = f"{self.base_url}/v1/watch/{watcher_id}"
        response = requests.get(endpoint, headers=self.headers)
        response.raise_for_status()
        return response.json()

    def delete_watcher(self, watcher_id: str) -> bool:
        """Delete an existing watcher"""
        endpoint = f"{self.base_url}/v1/watch/{watcher_id}"
        response = requests.delete(endpoint, headers=self.headers)
        return response.status_code == 200

def main():
    try:
        # Initialize API client
        api = ChangeDetectionAPI()

        # Example usage
        domain_url = "https://www.google.com"
        
        # Create watcher with minimal required fields
        result = api.create_watcher(
            url=domain_url,
            tag="stock",
            title="Title"
        )
        
        print("Watcher created successfully:", json.dumps(result, indent=2))
        
        # Get watcher status
        if 'uuid' in result:
            status = api.get_watcher_status(result['uuid'])
            print("Watcher status:", json.dumps(status, indent=2))

    except Exception as e:
        print(f"Error: {str(e)}")

if __name__ == "__main__":
    main()

1 Star2 Stars3 Stars4 Stars5 Stars (No Ratings Yet)
Loading...