from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
import logging
import asyncio
from datetime import datetime
from typing import Dict, Any, Optional, Callable, List
from kafka.admin import KafkaAdminClient


logger = logging.getLogger(__name__)

class SearchKafkaClientOld:
    def __init__(self, bootstrap_servers: list, consumer_group: str):
        self.bootstrap_servers = bootstrap_servers
        self.consumer_group = consumer_group
        self.producer: Optional[AIOKafkaProducer] = None
        self.consumer: Optional[AIOKafkaConsumer] = None
        
    async def start_producer(self):
        """Start Kafka producer"""
        try:
            self.producer = AIOKafkaProducer(
                bootstrap_servers=self.bootstrap_servers,
                value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
                key_serializer=lambda k: k.encode('utf-8') if k else None
            )
            await self.producer.start()
            logger.info("Kafka producer started")
        except Exception as e:
            logger.error(f"Error starting Kafka producer: {e}")
            raise
    
    async def start_consumer(self, topics: list):
        """Start Kafka consumer"""
        try:
            self.consumer = AIOKafkaConsumer(
                *topics,
                bootstrap_servers=self.bootstrap_servers,
                group_id=self.consumer_group,
                value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                auto_offset_reset='latest'
            )
            await self.consumer.start()
            logger.info(f"Kafka consumer started for topics: {topics}")
        except Exception as e:
            logger.error(f"Error starting Kafka consumer: {e}")
            raise
    
    async def send_message(self, topic: str, message: Dict[str, Any], key: Optional[str] = None):
        """Send message to Kafka topic"""
        try:
            if not self.producer:
                await self.start_producer()
            
            await self.producer.send(topic, value=message, key=key)
            logger.debug(f"Message sent to topic {topic}")
        except Exception as e:
            logger.error(f"Error sending message to {topic}: {e}")
            raise
    
    async def consume_messages(self, message_handler: Callable[[Dict[str, Any]], None]):
        """Consume messages from Kafka"""
        try:
            if not self.consumer:
                raise RuntimeError("Consumer not started")
            
            async for message in self.consumer:
                try:
                    await message_handler(message.value)
                except Exception as e:
                    logger.error(f"Error processing message: {e}")
        except Exception as e:
            logger.error(f"Error consuming messages: {e}")
            raise
    
    async def send_search_result(self, task_id: str, platform: str, results: Dict[str, Any], status: str = "completed"):
        """Send search results"""
        result_message = {
            "type": "task_completed" if status == "completed" else "task_failed",
            "task_id": task_id,
            "platform": platform,
            "results": results,
            "timestamp": datetime.utcnow().isoformat(),
            "status": status
        }
        
        await self.send_message("search-results", result_message, key=task_id)
    
    async def send_error_result(self, task_id: str, platform: str, error: str):
        """Send error result"""
        error_message = {
            "type": "task_failed",
            "task_id": task_id,
            "platform": platform,
            "error": error,
            "timestamp": datetime.utcnow().isoformat(),
            "status": "failed"
        }
        
        await self.send_message("search-results", error_message, key=task_id)
    
    async def stop(self):
        """Stop producer and consumer"""
        if self.producer:
            await self.producer.stop()
            logger.info("Kafka producer stopped")
        
        if self.consumer:
            await self.consumer.stop()
            logger.info("Kafka consumer stopped")






try:
    from kafka.admin import KafkaAdminClient
    KAFKA_ADMIN_AVAILABLE = True
except ImportError:
    KAFKA_ADMIN_AVAILABLE = False
    logger.warning("kafka-python not available, topic checking disabled")

logger = logging.getLogger(__name__)

class SearchKafkaClient:
    def __init__(self, bootstrap_servers: list, consumer_group: str):
        self.bootstrap_servers = bootstrap_servers
        self.consumer_group = consumer_group
        self.producer: Optional[AIOKafkaProducer] = None
        self.consumer: Optional[AIOKafkaConsumer] = None
        
    async def wait_for_kafka(self, timeout: int = 60) -> bool:
        """Wait for Kafka to be ready before starting producer/consumer"""
        start_time = asyncio.get_event_loop().time()
        
        # Validate bootstrap_servers format
        if not isinstance(self.bootstrap_servers, list):
            logger.error(f"Invalid bootstrap_servers format: {self.bootstrap_servers} (should be list)")
            raise ValueError(f"bootstrap_servers must be a list, got {type(self.bootstrap_servers)}")
        
        logger.info(f"Testing Kafka connection to: {self.bootstrap_servers}")
        
        while (asyncio.get_event_loop().time() - start_time) < timeout:
            test_producer = None
            try:
                # Simple connection test using a producer
                test_producer = AIOKafkaProducer(
                    bootstrap_servers=self.bootstrap_servers,
                    request_timeout_ms=10000,
                    retry_backoff_ms=1000,
                    # max_block_ms=5000
                    # DO NOT add retries or message_send_max_retries
                )
                
                await asyncio.wait_for(test_producer.start(), timeout=10)
                logger.info("Kafka cluster is ready!")
                return True
                
            except asyncio.TimeoutError:
                logger.info("Kafka connection timeout, retrying...")
            except ConnectionError as e:
                logger.info(f"Kafka connection error: {e}")
            except Exception as e:
                logger.info(f"Waiting for Kafka cluster... ({type(e).__name__}: {str(e)[:100]})")
            finally:
                if test_producer:
                    try:
                        await asyncio.wait_for(test_producer.stop(), timeout=5)
                    except Exception:
                        pass  # Ignore cleanup errors
                
            await asyncio.sleep(3)  # Increased sleep time
        
        raise TimeoutError(f"Kafka cluster not ready within {timeout} seconds")

    async def wait_for_topics(self, topics: List[str], timeout: int = 60) -> bool:
        """Wait for topics to be available"""
        if not KAFKA_ADMIN_AVAILABLE:
            logger.info("Kafka admin client not available, skipping topic check")
            return True
            
        start_time = asyncio.get_event_loop().time()
        
        while (asyncio.get_event_loop().time() - start_time) < timeout:
            try:
                admin_client = KafkaAdminClient(
                    bootstrap_servers=self.bootstrap_servers,
                    request_timeout_ms=5000
                )
                
                # Get cluster metadata to check if topics exist
                metadata = admin_client.describe_topics(topics)
                
                # Check if all topics are available
                available_topics = set(metadata)
                required_topics = set(topics)
                
                if required_topics.issubset(available_topics):
                    logger.info(f"All required topics are available: {topics}")
                    admin_client.close()
                    return True
                else:
                    missing_topics = required_topics - available_topics
                    logger.info(f"Waiting for topics to be available: {missing_topics}")
                
                admin_client.close()
                
            except Exception as e:
                logger.info(f"Waiting for topics {topics}... ({e})")
            
            await asyncio.sleep(2)
        
        logger.warning(f"Topics {topics} not available within {timeout} seconds, proceeding anyway")
        return False
        
    async def start_producer(self, max_retries: int = 10, initial_delay: float = 1.0):
        """Start Kafka producer with retry logic"""
        
        # Wait for Kafka to be ready
        try:
            await self.wait_for_kafka(timeout=60)
        except TimeoutError as e:
            logger.error(f"Kafka cluster not ready: {e}")
            raise
        
        # Wait for result topics to be available
        await self.wait_for_topics(["search-results"], timeout=30)
        
        last_exception = None
        
        for attempt in range(max_retries):
            try:
                self.producer = AIOKafkaProducer(
                    bootstrap_servers=self.bootstrap_servers,
                    value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
                    key_serializer=lambda k: k.encode('utf-8') if k else None,
                    # Enhanced settings
                    request_timeout_ms=30000,
                    retry_backoff_ms=1000,
                    connections_max_idle_ms=600000,
                    acks='all',  # Wait for all replicas to acknowledge
                    
                    max_in_flight_requests_per_connection=1
                )
                await self.producer.start()
                logger.info(f"Kafka producer started successfully (attempt {attempt + 1})")
                return
                
            except Exception as e:
                last_exception = e
                
                # Calculate delay with exponential backoff
                delay = min(initial_delay * (2 ** attempt), 30)  # Cap at 30 seconds
                
                logger.warning(
                    f"Attempt {attempt + 1}/{max_retries} failed to start Kafka producer: {e}. "
                    f"Retrying in {delay:.1f} seconds..."
                )
                
                # Clean up failed producer
                if self.producer:
                    try:
                        await self.producer.stop()
                    except:
                        pass
                    self.producer = None
                
                if attempt < max_retries - 1:  # Don't sleep on last attempt
                    await asyncio.sleep(delay)
        
        # If we get here, all retries failed
        error_msg = f"Failed to start Kafka producer after {max_retries} attempts. Last error: {last_exception}"
        logger.error(error_msg)
        raise Exception(error_msg)
    
    async def start_consumer(self, topics: List[str], max_retries: int = 10, initial_delay: float = 1.0, skip_health_check: bool = False):
        """Start Kafka consumer with retry logic and health checks"""
        
        # Step 1: Wait for Kafka to be ready (optional)
        if not skip_health_check:
            try:
                await self.wait_for_kafka(timeout=60)
            except TimeoutError as e:
                logger.warning(f"Health check failed, proceeding anyway: {e}")
        
        # Step 2: Wait for topics to be available
        await self.wait_for_topics(topics, timeout=30)
        
        # Step 3: Start consumer with retry logic
        last_exception = None
        
        for attempt in range(max_retries):
            try:
                self.consumer = AIOKafkaConsumer(
                    *topics,
                    bootstrap_servers=self.bootstrap_servers,
                    group_id=self.consumer_group,
                    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                    auto_offset_reset='latest',
                    # Enhanced timeout settings
                    request_timeout_ms=30000,
                    retry_backoff_ms=1000,
                    session_timeout_ms=30000,
                    heartbeat_interval_ms=10000,
                    connections_max_idle_ms=600000,
                    # Enable auto-commit with reasonable interval
                    enable_auto_commit=True,
                    auto_commit_interval_ms=5000,
                    api_version="auto"
                )
                
                await self.consumer.start()
                logger.info(f"Kafka consumer started successfully for topics: {topics} (attempt {attempt + 1})")
                return
                
            except Exception as e:
                last_exception = e
                
                # Calculate delay with exponential backoff
                delay = min(initial_delay * (2 ** attempt), 30)  # Cap at 30 seconds
                
                logger.warning(
                    f"Attempt {attempt + 1}/{max_retries} failed to start Kafka consumer: {e}. "
                    f"Retrying in {delay:.1f} seconds..."
                )
                
                # Clean up failed consumer
                if self.consumer:
                    try:
                        await self.consumer.stop()
                    except:
                        pass
                    self.consumer = None
                
                if attempt < max_retries - 1:  # Don't sleep on last attempt
                    await asyncio.sleep(delay)
        
        # If we get here, all retries failed
        error_msg = f"Failed to start Kafka consumer after {max_retries} attempts. Last error: {last_exception}"
        logger.error(error_msg)
        raise Exception(error_msg)
    
    async def send_message(self, topic: str, message: Dict[str, Any], key: Optional[str] = None, max_retries: int = 3):
        """Send message to Kafka topic with retry logic"""
        if not self.producer:
            await self.start_producer()
        
        last_exception = None
        
        for attempt in range(max_retries):
            try:
                # Send message and wait for acknowledgment
                future = await self.producer.send(topic, value=message, key=key)
                record_metadata = await future
                logger.debug(f"Message sent to topic {topic}, partition {record_metadata.partition}, offset {record_metadata.offset}")
                return record_metadata
                
            except Exception as e:
                last_exception = e
                logger.warning(f"Attempt {attempt + 1}/{max_retries} failed to send message to {topic}: {e}")
                
                if attempt < max_retries - 1:
                    await asyncio.sleep(1 * (attempt + 1))  # Linear backoff for sends
        
        logger.error(f"Failed to send message to {topic} after {max_retries} attempts: {last_exception}")
        raise last_exception
    
    async def consume_messages(self, message_handler: Callable[[Dict[str, Any]], None]):
        """Consume messages from Kafka with enhanced error handling"""
        if not self.consumer:
            raise RuntimeError("Consumer not started")
        
        try:
            async for message in self.consumer:
                try:
                    logger.debug(f"Processing message from topic {message.topic}, partition {message.partition}, offset {message.offset}")
                    await message_handler(message.value)
                except Exception as e:
                    logger.error(f"Error processing message from {message.topic}: {e}", exc_info=True)
                    # Continue processing other messages even if one fails
                    
        except asyncio.CancelledError:
            logger.info("Message consumption cancelled")
            raise
        except Exception as e:
            logger.error(f"Critical error in message consumption: {e}", exc_info=True)
            raise
    
    async def send_search_result(self, task_id: str, platform: str, results: Dict[str, Any], status: str = "completed"):
        """Send search results with retry logic"""
        result_message = {
            "type": "task_completed" if status == "completed" else "task_failed",
            "task_id": task_id,
            "platform": platform,
            "results": results,
            "timestamp": datetime.utcnow().isoformat(),
            "status": status
        }
        
        try:
            await self.send_message("search-results", result_message, key=task_id)
            logger.info(f"Search result sent for task {task_id} with status {status}")
        except Exception as e:
            logger.error(f"Failed to send search result for task {task_id}: {e}")
            raise
    
    async def send_error_result(self, task_id: str, platform: str, error: str):
        """Send error result with retry logic"""
        error_message = {
            "type": "task_failed",
            "task_id": task_id,
            "platform": platform,
            "error": error,
            "timestamp": datetime.utcnow().isoformat(),
            "status": "failed"
        }
        
        try:
            await self.send_message("search-results", error_message, key=task_id)
            logger.info(f"Error result sent for task {task_id}")
        except Exception as e:
            logger.error(f"Failed to send error result for task {task_id}: {e}")
            # Don't re-raise here as this is already an error path
    
    async def stop(self):
        """Stop producer and consumer gracefully"""
        if self.producer:
            try:
                await self.producer.stop()
                logger.info("Kafka producer stopped successfully")
            except Exception as e:
                logger.error(f"Error stopping Kafka producer: {e}")
            finally:
                self.producer = None
        
        if self.consumer:
            try:
                await self.consumer.stop()
                logger.info("Kafka consumer stopped successfully")
            except Exception as e:
                logger.error(f"Error stopping Kafka consumer: {e}")
            finally:
                self.consumer = None
    
    async def __aenter__(self):
        """Async context manager entry"""
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit"""
        await self.stop()