Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Streaming Responses and Real-Time AI Interfaces

Streaming Responses and Real-Time AI Interfaces

AI & Machine Learning🔥 Expert26 min readApr 27, 2026Updated Apr 27, 2026
Table of Contents
  • Prerequisites
  • The Anatomy of Streaming AI Responses
  • Server-Side Streaming Architecture
  • Server-Sent Events (SSE) Implementation
  • Advanced WebSocket Implementation
  • Client-Side Rendering and State Management
  • Progressive Text Rendering
  • React Integration with Streaming State
  • Backpressure and Flow Control
  • Server-Side Backpressure Handling
  • Client-Side Flow Control
  • Error Handling and Recovery
  • Comprehensive Error Handling Strategy

Imagine you're building a customer support chatbot that needs to handle complex technical queries. A user asks: "Can you walk me through setting up distributed logging across our microservices architecture with proper correlation IDs and error aggregation?" With traditional request-response patterns, your user sits staring at a blank screen for 15-20 seconds while the LLM processes this complex query. They start wondering if the system froze. They might even refresh the page or give up entirely.

Now imagine the same scenario with streaming responses. Within 500 milliseconds, words start flowing onto the screen: "I'll help you set up distributed logging for your microservices. Let's start with..." The user immediately knows the system is working and can begin reading and processing the response as it arrives. This isn't just a better user experience—it's a fundamentally different interaction paradigm that makes AI feel more conversational and responsive.

Streaming responses represent a critical shift from traditional batch processing to real-time, interactive AI interfaces. But implementing them properly requires understanding not just the API calls, but the entire architecture stack: WebSocket connections, backpressure handling, client-side rendering patterns, error recovery, and the subtle but crucial details that separate professional implementations from demos that break under load.

What you'll learn:

  • How to implement server-sent events (SSE) and WebSocket streaming for LLM responses
  • Advanced client-side rendering patterns for handling partial tokens and formatting
  • Backpressure management and flow control in streaming pipelines
  • Error handling and recovery strategies for interrupted streams
  • Performance optimization techniques for high-throughput streaming scenarios
  • Production-ready architectures for real-time AI interfaces at scale

Prerequisites

You should have solid experience with asynchronous JavaScript/Python programming, REST APIs, and basic WebSocket concepts. Familiarity with React or similar reactive frameworks is helpful but not required. You should understand HTTP streaming concepts and have worked with LLM APIs like OpenAI's GPT models.

The Anatomy of Streaming AI Responses

Before diving into implementation, let's understand what happens when an LLM generates a streaming response. Traditional API calls are atomic—you send a request and receive a complete response. Streaming breaks this into a continuous flow of partial responses called "chunks" or "tokens."

# Traditional approach - blocks until complete
response = openai.ChatCompletion.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Explain quantum computing"}]
)
complete_text = response.choices[0].message.content
print(complete_text)  # All at once after 10+ seconds
# Streaming approach - yields partial responses
response = openai.ChatCompletion.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Explain quantum computing"}],
    stream=True
)

for chunk in response:
    if chunk.choices[0].delta.content:
        partial_text = chunk.choices[0].delta.content
        print(partial_text, end="", flush=True)  # Appears incrementally

The magic happens in that stream=True parameter, but the real complexity lies in everything that comes after: how do you transport these chunks to your frontend, render them smoothly, handle connection failures, and maintain state consistency?

Server-Side Streaming Architecture

Server-Sent Events (SSE) Implementation

SSE provides a simple, standards-based approach for streaming data from server to client. Unlike WebSockets, SSE is unidirectional and automatically handles connection recovery, making it ideal for LLM streaming where the client primarily consumes data.

from flask import Flask, Response, request
import json
import openai
from typing import Iterator

app = Flask(__name__)

def generate_streaming_response(messages: list) -> Iterator[str]:
    """Generate SSE-formatted streaming response from OpenAI."""
    try:
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=messages,
            stream=True,
            temperature=0.7
        )
        
        for chunk in response:
            # Handle different chunk types
            if chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                # Format as SSE event
                data = json.dumps({
                    "type": "content",
                    "content": content,
                    "timestamp": time.time()
                })
                yield f"data: {data}\n\n"
            
            elif chunk.choices[0].finish_reason:
                # Signal completion
                data = json.dumps({
                    "type": "complete",
                    "finish_reason": chunk.choices[0].finish_reason
                })
                yield f"data: {data}\n\n"
                
    except Exception as e:
        # Always send error through the stream
        error_data = json.dumps({
            "type": "error",
            "message": str(e),
            "error_code": getattr(e, 'code', 'unknown')
        })
        yield f"data: {error_data}\n\n"

@app.route('/api/chat/stream', methods=['POST'])
def stream_chat():
    messages = request.json.get('messages', [])
    
    return Response(
        generate_streaming_response(messages),
        mimetype='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive',
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Headers': 'Cache-Control'
        }
    )

This implementation handles several critical aspects:

  1. Structured chunk format: Each chunk contains metadata beyond just content
  2. Error propagation: Errors flow through the same stream rather than breaking it
  3. Completion signaling: The client knows definitively when the response is complete
  4. Proper SSE formatting: Each message follows the data: {json}\n\n format

Advanced WebSocket Implementation

For bidirectional communication or when you need more control over the connection lifecycle, WebSockets provide greater flexibility:

import asyncio
import websockets
import json
from typing import Dict, Set
import logging

class StreamingChatManager:
    def __init__(self):
        self.active_connections: Dict[str, websockets.WebSocketServerProtocol] = {}
        self.connection_metadata: Dict[str, dict] = {}
        
    async def register_connection(self, websocket: websockets.WebSocketServerProtocol, 
                                connection_id: str, user_id: str):
        """Register and track WebSocket connections."""
        self.active_connections[connection_id] = websocket
        self.connection_metadata[connection_id] = {
            "user_id": user_id,
            "connected_at": time.time(),
            "messages_sent": 0
        }
        logging.info(f"Connection {connection_id} registered for user {user_id}")
        
    async def unregister_connection(self, connection_id: str):
        """Clean up connection tracking."""
        self.active_connections.pop(connection_id, None)
        self.connection_metadata.pop(connection_id, None)
        logging.info(f"Connection {connection_id} unregistered")
        
    async def stream_llm_response(self, connection_id: str, messages: list, 
                                 stream_config: dict = None):
        """Stream LLM response through WebSocket with advanced features."""
        websocket = self.active_connections.get(connection_id)
        if not websocket:
            logging.error(f"No active connection for {connection_id}")
            return
            
        config = stream_config or {}
        chunk_buffer = []
        buffer_size = config.get('buffer_size', 1)  # Tokens per chunk
        
        try:
            # Send initial status
            await websocket.send(json.dumps({
                "type": "stream_start",
                "request_id": connection_id,
                "model": config.get('model', 'gpt-4')
            }))
            
            response = await openai.ChatCompletion.acreate(
                model=config.get('model', 'gpt-4'),
                messages=messages,
                stream=True,
                temperature=config.get('temperature', 0.7),
                max_tokens=config.get('max_tokens', 2000)
            )
            
            async for chunk in response:
                if chunk.choices[0].delta.content:
                    content = chunk.choices[0].delta.content
                    chunk_buffer.append(content)
                    
                    # Send buffered chunks to smooth out delivery
                    if len(chunk_buffer) >= buffer_size:
                        combined_content = ''.join(chunk_buffer)
                        await websocket.send(json.dumps({
                            "type": "content_chunk",
                            "content": combined_content,
                            "chunk_index": self.connection_metadata[connection_id]["messages_sent"]
                        }))
                        chunk_buffer = []
                        self.connection_metadata[connection_id]["messages_sent"] += 1
                        
                        # Add small delay to prevent overwhelming client
                        await asyncio.sleep(0.01)
                        
            # Send any remaining buffered content
            if chunk_buffer:
                combined_content = ''.join(chunk_buffer)
                await websocket.send(json.dumps({
                    "type": "content_chunk", 
                    "content": combined_content,
                    "chunk_index": self.connection_metadata[connection_id]["messages_sent"]
                }))
                
            # Signal completion
            await websocket.send(json.dumps({
                "type": "stream_complete",
                "total_chunks": self.connection_metadata[connection_id]["messages_sent"] + 1,
                "completion_time": time.time()
            }))
            
        except websockets.exceptions.ConnectionClosed:
            logging.info(f"Connection {connection_id} closed during streaming")
            await self.unregister_connection(connection_id)
        except Exception as e:
            await websocket.send(json.dumps({
                "type": "error",
                "error": str(e),
                "error_type": type(e).__name__
            }))
            logging.error(f"Streaming error for {connection_id}: {e}")

# WebSocket handler
chat_manager = StreamingChatManager()

async def handle_websocket(websocket, path):
    connection_id = str(uuid.uuid4())
    try:
        # Handle connection authentication/setup
        auth_message = await websocket.recv()
        auth_data = json.loads(auth_message)
        user_id = auth_data.get('user_id')
        
        await chat_manager.register_connection(websocket, connection_id, user_id)
        
        await websocket.send(json.dumps({
            "type": "connection_established",
            "connection_id": connection_id
        }))
        
        async for message in websocket:
            data = json.loads(message)
            if data['type'] == 'chat_request':
                await chat_manager.stream_llm_response(
                    connection_id, 
                    data['messages'],
                    data.get('config', {})
                )
                
    except websockets.exceptions.ConnectionClosed:
        pass
    finally:
        await chat_manager.unregister_connection(connection_id)

# Start WebSocket server
start_server = websockets.serve(handle_websocket, "localhost", 8765)

The WebSocket implementation provides several advantages over SSE:

  1. Bidirectional communication: Client can send additional messages or control signals
  2. Connection management: Explicit tracking of connection state and metadata
  3. Buffering control: Configurable chunk sizes to optimize network usage
  4. Enhanced error handling: More granular error reporting and recovery

Client-Side Rendering and State Management

The client side of streaming responses involves complex state management challenges. You're not just displaying text—you're managing partial renders, handling formatting, and maintaining smooth UX during network interruptions.

Progressive Text Rendering

class StreamingTextRenderer {
    constructor(containerElement, options = {}) {
        this.container = containerElement;
        this.buffer = '';
        this.renderedLength = 0;
        this.options = {
            typewriterDelay: options.typewriterDelay || 0,
            chunkSize: options.chunkSize || 10,
            enableMarkdown: options.enableMarkdown || true,
            ...options
        };
        
        // For markdown parsing of partial content
        this.markdownParser = new marked.Renderer();
        this.codeBlockPattern = /```(\w+)?\n([\s\S]*?)\n```/g;
        this.incompleteCodeBlock = false;
    }
    
    appendContent(newContent) {
        this.buffer += newContent;
        
        if (this.options.typewriterDelay > 0) {
            this.renderWithTypewriter();
        } else {
            this.renderImmediate();
        }
    }
    
    renderImmediate() {
        if (this.options.enableMarkdown) {
            this.renderMarkdownSafe();
        } else {
            this.renderPlainText();
        }
    }
    
    renderMarkdownSafe() {
        // Handle partial markdown gracefully
        let renderableContent = this.buffer;
        let tempContainer = document.createElement('div');
        
        // Check for incomplete code blocks
        const codeBlockMatches = [...this.buffer.matchAll(this.codeBlockPattern)];
        const lastTripleBacktick = this.buffer.lastIndexOf('```');
        
        if (lastTripleBacktick > -1) {
            const afterLastBacktick = this.buffer.slice(lastTripleBacktick + 3);
            // If we don't have a closing ```, treat as incomplete
            if (!afterLastBacktick.includes('```')) {
                this.incompleteCodeBlock = true;
                // Only render up to the incomplete code block
                renderableContent = this.buffer.slice(0, lastTripleBacktick);
            } else {
                this.incompleteCodeBlock = false;
            }
        }
        
        try {
            // Parse markdown but handle incomplete structures
            let parsed = marked.parse(renderableContent);
            tempContainer.innerHTML = parsed;
            
            // If we have an incomplete code block, add it as plain text
            if (this.incompleteCodeBlock) {
                const incompleteBlock = this.buffer.slice(lastTripleBacktick);
                const codeElement = document.createElement('pre');
                codeElement.className = 'incomplete-code-block';
                codeElement.textContent = incompleteBlock;
                tempContainer.appendChild(codeElement);
            }
            
            this.container.innerHTML = tempContainer.innerHTML;
            
        } catch (e) {
            // Fallback to plain text if markdown parsing fails
            this.renderPlainText();
        }
    }
    
    renderPlainText() {
        // Simple but fast text rendering with basic formatting preservation
        const lines = this.buffer.split('\n');
        let html = '';
        
        for (let i = 0; i < lines.length; i++) {
            const line = this.escapeHtml(lines[i]);
            if (i < lines.length - 1) {
                html += line + '<br>';
            } else {
                html += line;
            }
        }
        
        this.container.innerHTML = html;
    }
    
    renderWithTypewriter() {
        // Smooth typewriter effect for better UX
        const currentVisible = this.container.textContent.length;
        const targetLength = Math.min(
            currentVisible + this.options.chunkSize,
            this.buffer.length
        );
        
        if (currentVisible < targetLength) {
            const nextChunk = this.buffer.slice(currentVisible, targetLength);
            this.container.textContent += nextChunk;
            
            if (targetLength < this.buffer.length) {
                setTimeout(() => this.renderWithTypewriter(), this.options.typewriterDelay);
            }
        }
    }
    
    escapeHtml(text) {
        const div = document.createElement('div');
        div.textContent = text;
        return div.innerHTML;
    }
    
    complete() {
        // Final render pass for complete content
        this.incompleteCodeBlock = false;
        this.renderImmediate();
        
        // Trigger syntax highlighting if available
        if (typeof Prism !== 'undefined') {
            Prism.highlightAllUnder(this.container);
        }
    }
}

React Integration with Streaming State

import React, { useState, useEffect, useRef, useCallback } from 'react';

const StreamingChatInterface = ({ apiEndpoint, messages }) => {
    const [streamingResponse, setStreamingResponse] = useState('');
    const [isStreaming, setIsStreaming] = useState(false);
    const [streamError, setStreamError] = useState(null);
    const [connectionStatus, setConnectionStatus] = useState('disconnected');
    
    const eventSourceRef = useRef(null);
    const rendererRef = useRef(null);
    const responseContainerRef = useRef(null);
    
    // Initialize text renderer
    useEffect(() => {
        if (responseContainerRef.current) {
            rendererRef.current = new StreamingTextRenderer(
                responseContainerRef.current,
                {
                    enableMarkdown: true,
                    typewriterDelay: 0,
                    chunkSize: 15
                }
            );
        }
    }, []);
    
    const startStreaming = useCallback(async () => {
        if (isStreaming) return;
        
        setIsStreaming(true);
        setStreamError(null);
        setStreamingResponse('');
        setConnectionStatus('connecting');
        
        try {
            const response = await fetch(`${apiEndpoint}/chat/stream`, {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'Accept': 'text/event-stream',
                },
                body: JSON.stringify({ messages })
            });
            
            if (!response.ok) {
                throw new Error(`HTTP ${response.status}: ${response.statusText}`);
            }
            
            setConnectionStatus('connected');
            
            const reader = response.body.getReader();
            const decoder = new TextDecoder();
            let buffer = '';
            
            while (true) {
                const { done, value } = await reader.read();
                
                if (done) {
                    setConnectionStatus('completed');
                    break;
                }
                
                buffer += decoder.decode(value, { stream: true });
                const lines = buffer.split('\n\n');
                buffer = lines.pop(); // Keep incomplete line in buffer
                
                for (const line of lines) {
                    if (line.startsWith('data: ')) {
                        try {
                            const data = JSON.parse(line.slice(6));
                            handleStreamData(data);
                        } catch (e) {
                            console.warn('Failed to parse SSE data:', line);
                        }
                    }
                }
            }
            
        } catch (error) {
            setStreamError(error.message);
            setConnectionStatus('error');
        } finally {
            setIsStreaming(false);
        }
    }, [apiEndpoint, messages, isStreaming]);
    
    const handleStreamData = useCallback((data) => {
        switch (data.type) {
            case 'content':
                setStreamingResponse(prev => {
                    const newContent = prev + data.content;
                    // Update renderer
                    if (rendererRef.current) {
                        rendererRef.current.appendContent(data.content);
                    }
                    return newContent;
                });
                break;
                
            case 'complete':
                setIsStreaming(false);
                setConnectionStatus('completed');
                if (rendererRef.current) {
                    rendererRef.current.complete();
                }
                break;
                
            case 'error':
                setStreamError(data.message);
                setIsStreaming(false);
                setConnectionStatus('error');
                break;
                
            default:
                console.log('Unknown stream data type:', data.type);
        }
    }, []);
    
    const stopStreaming = useCallback(() => {
        if (eventSourceRef.current) {
            eventSourceRef.current.close();
        }
        setIsStreaming(false);
        setConnectionStatus('disconnected');
    }, []);
    
    return (
        <div className="streaming-chat-interface">
            <div className="connection-status">
                Status: <span className={`status-${connectionStatus}`}>
                    {connectionStatus}
                </span>
                {isStreaming && (
                    <button onClick={stopStreaming} className="stop-button">
                        Stop Generation
                    </button>
                )}
            </div>
            
            <div 
                ref={responseContainerRef}
                className="streaming-response"
                style={{
                    minHeight: '200px',
                    padding: '16px',
                    border: '1px solid #ddd',
                    borderRadius: '8px',
                    whiteSpace: 'pre-wrap',
                    fontFamily: 'monospace'
                }}
            />
            
            {streamError && (
                <div className="error-display">
                    <strong>Error:</strong> {streamError}
                    <button onClick={() => setStreamError(null)}>Dismiss</button>
                </div>
            )}
            
            {!isStreaming && (
                <button 
                    onClick={startStreaming}
                    className="start-streaming-button"
                    disabled={messages.length === 0}
                >
                    Start Streaming Response
                </button>
            )}
        </div>
    );
};

export default StreamingChatInterface;

Backpressure and Flow Control

One of the most critical aspects of production streaming systems is handling backpressure—what happens when the client can't process data as fast as the server sends it. This becomes especially important when dealing with fast LLMs or slower client devices.

Server-Side Backpressure Handling

import asyncio
from collections import deque
import time
from typing import AsyncIterator
import logging

class BackpressureAwareStreamer:
    def __init__(self, max_buffer_size: int = 1000, 
                 flow_control_window: int = 50):
        self.max_buffer_size = max_buffer_size
        self.flow_control_window = flow_control_window
        self.client_ack_buffer = deque()
        self.pending_chunks = 0
        
    async def stream_with_backpressure(self, websocket, llm_response_iterator: AsyncIterator[str]):
        """Stream LLM response with intelligent backpressure management."""
        chunk_id = 0
        send_buffer = deque()
        
        async def send_worker():
            """Async worker that sends chunks respecting flow control."""
            nonlocal chunk_id
            
            while True:
                try:
                    # Wait for data or completion signal
                    if not send_buffer:
                        await asyncio.sleep(0.01)  # Small delay to prevent busy waiting
                        continue
                        
                    # Check if we're within flow control window
                    if self.pending_chunks >= self.flow_control_window:
                        # Wait for acknowledgments before sending more
                        await self.wait_for_acks(websocket)
                        
                    chunk_data = send_buffer.popleft()
                    
                    if chunk_data is None:  # Completion signal
                        await websocket.send(json.dumps({
                            "type": "stream_complete",
                            "final_chunk_id": chunk_id - 1
                        }))
                        break
                        
                    # Send chunk with flow control metadata
                    chunk_id += 1
                    await websocket.send(json.dumps({
                        "type": "content_chunk",
                        "content": chunk_data,
                        "chunk_id": chunk_id,
                        "pending_acks": self.pending_chunks,
                        "requires_ack": True
                    }))
                    
                    self.pending_chunks += 1
                    
                except websockets.exceptions.ConnectionClosed:
                    logging.info("Connection closed during send")
                    break
                except Exception as e:
                    logging.error(f"Send error: {e}")
                    break
        
        async def receive_worker():
            """Handle client acknowledgments and flow control."""
            async for message in websocket:
                try:
                    data = json.loads(message)
                    if data.get('type') == 'chunk_ack':
                        self.handle_chunk_ack(data['chunk_id'])
                except Exception as e:
                    logging.error(f"Ack processing error: {e}")
        
        # Start concurrent workers
        send_task = asyncio.create_task(send_worker())
        receive_task = asyncio.create_task(receive_worker())
        
        try:
            # Feed LLM response into send buffer
            async for chunk in llm_response_iterator:
                # Apply backpressure if buffer is full
                while len(send_buffer) >= self.max_buffer_size:
                    await asyncio.sleep(0.1)
                    logging.warning("Send buffer full, applying backpressure")
                    
                send_buffer.append(chunk)
                
            # Signal completion
            send_buffer.append(None)
            
            # Wait for send completion
            await send_task
            
        finally:
            receive_task.cancel()
            
    def handle_chunk_ack(self, chunk_id: int):
        """Process chunk acknowledgment from client."""
        self.pending_chunks = max(0, self.pending_chunks - 1)
        self.client_ack_buffer.append({
            "chunk_id": chunk_id,
            "ack_time": time.time()
        })
        
        # Clean old acks
        cutoff_time = time.time() - 30  # Keep 30 seconds of ack history
        while (self.client_ack_buffer and 
               self.client_ack_buffer[0]["ack_time"] < cutoff_time):
            self.client_ack_buffer.popleft()
    
    async def wait_for_acks(self, websocket, timeout: float = 5.0):
        """Wait for client acknowledgments to clear flow control window."""
        start_time = time.time()
        
        while self.pending_chunks >= self.flow_control_window:
            if time.time() - start_time > timeout:
                logging.warning("Flow control timeout, forcing continue")
                # Reset pending count to prevent permanent blocking
                self.pending_chunks = 0
                break
                
            await asyncio.sleep(0.1)
            
    def get_flow_control_stats(self) -> dict:
        """Return current flow control statistics."""
        return {
            "pending_chunks": self.pending_chunks,
            "buffer_utilization": len(self.client_ack_buffer),
            "flow_control_window": self.flow_control_window,
            "recent_ack_rate": self.calculate_ack_rate()
        }
        
    def calculate_ack_rate(self) -> float:
        """Calculate recent acknowledgment rate for adaptive flow control."""
        if len(self.client_ack_buffer) < 2:
            return 0.0
            
        recent_acks = [ack for ack in self.client_ack_buffer 
                      if ack["ack_time"] > time.time() - 5]
        
        if len(recent_acks) < 2:
            return 0.0
            
        time_span = recent_acks[-1]["ack_time"] - recent_acks[0]["ack_time"]
        return len(recent_acks) / max(time_span, 0.1)

Client-Side Flow Control

class FlowControlledStreamClient {
    constructor(websocket) {
        this.websocket = websocket;
        this.processingQueue = [];
        this.maxProcessingQueue = 100;
        this.isProcessing = false;
        this.ackDelay = 50; // ms delay between acks to prevent flooding
        this.lastAckTime = 0;
        
        this.setupMessageHandler();
    }
    
    setupMessageHandler() {
        this.websocket.addEventListener('message', (event) => {
            const data = JSON.parse(event.data);
            
            if (data.type === 'content_chunk') {
                this.handleContentChunk(data);
            }
        });
    }
    
    async handleContentChunk(chunkData) {
        // Add to processing queue
        this.processingQueue.push(chunkData);
        
        // Apply backpressure if queue is full
        if (this.processingQueue.length >= this.maxProcessingQueue) {
            console.warn('Client processing queue full, dropping chunks');
            // Keep only the most recent chunks
            this.processingQueue = this.processingQueue.slice(-this.maxProcessingQueue / 2);
        }
        
        // Start processing if not already running
        if (!this.isProcessing) {
            this.processQueue();
        }
        
        // Send acknowledgment with rate limiting
        if (chunkData.requires_ack) {
            await this.sendAcknowledgment(chunkData.chunk_id);
        }
    }
    
    async processQueue() {
        this.isProcessing = true;
        
        while (this.processingQueue.length > 0) {
            const chunk = this.processingQueue.shift();
            
            try {
                // Simulate processing time (rendering, DOM updates, etc.)
                await this.processChunk(chunk);
                
                // Small delay to prevent overwhelming the browser
                await this.sleep(10);
                
            } catch (error) {
                console.error('Chunk processing error:', error);
                // Continue processing other chunks
            }
        }
        
        this.isProcessing = false;
    }
    
    async processChunk(chunk) {
        // Your actual chunk processing logic here
        // This might involve DOM updates, state changes, etc.
        return new Promise((resolve) => {
            requestAnimationFrame(() => {
                // Update UI
                this.updateDisplay(chunk.content);
                resolve();
            });
        });
    }
    
    async sendAcknowledgment(chunkId) {
        const now = Date.now();
        
        // Rate limit acknowledgments
        if (now - this.lastAckTime < this.ackDelay) {
            return;
        }
        
        this.lastAckTime = now;
        
        try {
            this.websocket.send(JSON.stringify({
                type: 'chunk_ack',
                chunk_id: chunkId,
                client_time: now,
                queue_length: this.processingQueue.length
            }));
        } catch (error) {
            console.error('Failed to send acknowledgment:', error);
        }
    }
    
    sleep(ms) {
        return new Promise(resolve => setTimeout(resolve, ms));
    }
    
    updateDisplay(content) {
        // Implement your display update logic
        console.log('Processing chunk:', content);
    }
    
    getClientStats() {
        return {
            queueLength: this.processingQueue.length,
            isProcessing: this.isProcessing,
            maxQueueSize: this.maxProcessingQueue,
            lastAckTime: this.lastAckTime
        };
    }
}

Error Handling and Recovery

Streaming connections are inherently fragile—network issues, server overloads, or client-side problems can interrupt the flow at any time. Robust error handling and recovery mechanisms are essential for production systems.

Comprehensive Error Handling Strategy

import asyncio
import json
import logging
import time
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Dict, Any

class StreamState(Enum):
    IDLE = "idle"
    CONNECTING = "connecting" 
    STREAMING = "streaming"
    PAUSED = "paused"
    ERROR = "error"
    COMPLETED = "completed"

@dataclass
class StreamError:
    error_type: str
    message: str
    recoverable: bool
    retry_after: Optional[float] = None
    context: Optional[Dict[str, Any]] = None

class ResilientStreamManager:
    def __init__(self):
        self.state = StreamState.IDLE
        self.retry_counts = {}
        self.max_retries = 3
        self.base_retry_delay = 1.0
        self.recovery_callbacks = {}
        
    async def stream_with_recovery(self, connection_id: str, request_data: dict):
        """Main streaming method with comprehensive error handling."""
        self.state = StreamState.CONNECTING
        retry_count = 0
        
        while retry_count <= self.max_retries:
            try:
                await self.attempt_stream(connection_id, request_data)
                # If we reach here, streaming completed successfully
                self.state = StreamState.COMPLETED
                return
                
            except StreamingError as e:
                await self.handle_streaming_error(connection_id, e, retry_count)
                if not e.recoverable or retry_count >= self.max_retries:
                    self.state = StreamState.ERROR
                    raise e
                    
                retry_count += 1
                delay = self.calculate_retry_delay(retry_count)
                await asyncio.sleep(delay)
                
            except asyncio.CancelledError:
                logging.info(f"Streaming cancelled for {connection_id}")
                self.state = StreamState.IDLE
                raise
                
            except Exception as e:
                # Unexpected error - treat as non-recoverable
                error = StreamingError(
                    error_type="unexpected_error",
                    message=str(e),
                    recoverable=False
                )
                await self.handle_streaming_error(connection_id, error, retry_count)
                self.state = StreamState.ERROR
                raise error
    
    async def attempt_stream(self, connection_id: str, request_data: dict):
        """Single streaming attempt with granular error detection."""
        websocket = self.get_connection(connection_id)
        if not websocket:
            raise StreamingError(
                error_type="connection_lost",
                message="WebSocket connection not found",
                recoverable=True
            )
            
        self.state = StreamState.STREAMING
        chunk_count = 0
        last_chunk_time = time.time()
        
        try:
            # Initialize LLM stream
            llm_stream = await self.create_llm_stream(request_data)
            
            # Send stream start notification
            await self.send_safe(websocket, {
                "type": "stream_start",
                "request_id": connection_id,
                "timestamp": time.time()
            })
            
            async for chunk in llm_stream:
                # Check for timeout between chunks
                current_time = time.time()
                if current_time - last_chunk_time > 30:  # 30 second timeout
                    raise StreamingError(
                        error_type="chunk_timeout",
                        message="No chunks received in 30 seconds",
                        recoverable=True,
                        context={"last_chunk_time": last_chunk_time, "chunk_count": chunk_count}
                    )
                
                # Process and send chunk
                processed_chunk = await self.process_chunk(chunk, chunk_count)
                await self.send_safe(websocket, processed_chunk)
                
                chunk_count += 1
                last_chunk_time = current_time
                
                # Health check every 50 chunks
                if chunk_count % 50 == 0:
                    await self.health_check(websocket, connection_id)
                    
        except openai.error.RateLimitError as e:
            raise StreamingError(
                error_type="rate_limit",
                message="OpenAI API rate limit exceeded", 
                recoverable=True,
                retry_after=60.0,  # Wait 60 seconds
                context={"openai_error": str(e)}
            )
            
        except openai.error.APIError as e:
            # Determine if API error is recoverable
            recoverable = "server_error" in str(e).lower() or "503" in str(e)
            raise StreamingError(
                error_type="api_error",
                message=f"OpenAI API error: {str(e)}",
                recoverable=recoverable,
                context={"openai_error": str(e)}
            )
            
        except websockets.exceptions.ConnectionClosed:
            raise StreamingError(
                error_type="connection_closed",
                message="WebSocket connection closed unexpectedly",
                recoverable=True,
                context={"chunks_sent": chunk_count}
            )
            
    async def handle_streaming_error(self, connection_id: str, 
                                   error: StreamingError, retry_count: int):
        """Comprehensive error handling with context preservation."""
        logging.error(f"Streaming error for {connection_id}: {error.message}")
        
        # Update retry tracking
        self.retry_counts[connection_id] = retry_count
        
        # Send error notification to client if connection still exists
        websocket = self.get_connection(connection_id)
        if websocket:
            try:
                await self.send_safe(websocket, {
                    "type": "stream_error",
                    "error_type": error.error_type,
                    "message": error.message,
                    "recoverable": error.recoverable,
                    "retry_count": retry_count,
                    "retry_after": error.retry_after,
                    "context": error.context
                })
            except:
                # If we can't send error notification, connection is truly lost
                pass
        
        # Execute recovery callbacks
        recovery_callback = self.recovery_callbacks.get(error.error_type)
        if recovery_callback:
            try:
                await recovery_callback(connection_id, error, retry_count)
            except Exception as callback_error:
                logging.error(f"Recovery callback failed: {callback_error}")
    
    async def send_safe(self, websocket, data: dict):
        """Safe WebSocket send with connection validation."""
        if websocket.closed:
            raise StreamingError(
                error_type="connection_closed",
                message="Cannot send to closed WebSocket",
                recoverable=True
            )
            
        try:
            await websocket.send(json.dumps(data))
        except websockets.exceptions.ConnectionClosed:
            raise StreamingError(
                error_type="connection_closed", 
                message="Connection closed during send",
                recoverable=True
            )
    
    def calculate_retry_delay(self, retry_count: int) -> float:
        """Exponential backoff with jitter."""
        base_delay = self.base_retry_delay * (2 ** retry_count)
        # Add jitter to prevent thundering herd
        jitter = base_delay * 0.1 * (time.time() % 1)
        return min(base_delay + jitter, 60.0)  # Cap at 60 seconds
    
    def register_recovery_callback(self, error_type: str, callback):
        """Register custom recovery logic for specific error types."""
        self.recovery_callbacks[error_type] = callback

class StreamingError(Exception):
    def __init__(self, error_type: str, message: str, recoverable: bool = True, 
                 retry_after: Optional[float] = None, context: Optional[Dict] = None):
        self.error_type = error_type
        self.message = message
        self.recoverable = recoverable
        self.retry_after = retry_after
        self.context = context or {}
        super().__init__(message)

Client-Side Recovery and Reconnection

class ResilientStreamingClient {
    constructor(wsUrl, options = {}) {
        this.wsUrl = wsUrl;
        this.options = {
            maxRetries: 5,
            initialRetryDelay: 1000,
            maxRetryDelay: 30000,
            heartbeatInterval: 30000,
            ...options
        };
        
        this.state = 'disconnected';
        this.retryCount = 0;
        this.websocket = null;
        this.heartbeatTimer = null;
        this.reconnectTimer = null;
        
        // Stream state preservation
        this.streamBuffer = [];
        this.lastProcessedChunk = -1;
        this.streamMetadata = {};
        
        this.eventHandlers = new Map();
    }
    
    async connect() {
        if (this.state === 'connecting' || this.state === 'connected') {
            return;
        }
        
        this.state = 'connecting';
        this.emit('stateChange', { state: this.state });
        
        try {
            this.websocket = new WebSocket(this.wsUrl);
            this.setupWebSocketHandlers();
            
            // Wait for connection
            await new Promise((resolve, reject) => {
                const timeout = setTimeout(() => {
                    reject(new Error('Connection timeout'));
                }, 10000);
                
                this.websocket.addEventListener('open', () => {
                    clearTimeout(timeout);
                    resolve();
                });
                
                this.websocket.addEventListener('error', (error) => {
                    clearTimeout(timeout);
                    reject(error);
                });
            });
            
            this.state = 'connected';
            this.retryCount = 0;
            this.startHeartbeat();
            this.emit('connected');
            
        } catch (error) {
            this.state = 'disconnected';
            this.emit('connectionError', { error, retryCount: this.retryCount });
            await this.handleConnectionFailure(error);
        }
    }
    
    setupWebSocketHandlers() {
        this.websocket.addEventListener('message', (event) => {
            this.handleMessage(JSON.parse(event.data));
        });
        
        this.websocket.addEventListener('close', (event) => {
            this.handleConnectionClose(event);
        });
        
        this.websocket.addEventListener('error', (error) => {
            this.handleConnectionError(error);
        });
    }
    
    handleMessage(data) {
        switch (data.type) {
            case 'stream_start':
                this.handleStreamStart(data);
                break;
                
            case 'content_chunk':
                this.handleContentChunk(data);
                break;
                
            case 'stream_error':
                this.handleStreamError(data);
                break;
                
            case 'stream_complete':
                this.handleStreamComplete(data);
                break;
                
            case 'heartbeat':
                this.handleHeartbeat(data);
                break;
                
            default:
                this.emit('unknownMessage', data);
        }
    }
    
    handleContentChunk(data) {
        // Check for missing chunks
        if (data.chunk_id !== undefined && 
            data.chunk_id !== this.lastProcessedChunk + 1) {
            this.emit('chunkGap', {
                expected: this.lastProcessedChunk + 1,
                received: data.chunk_id
            });
            
            // Request missing chunks if possible
            this.requestMissingChunks(this.lastProcessedChunk + 1, data.chunk_id - 1);
        }
        
        // Buffer chunk for processing
        this.streamBuffer.push(data);
        this.lastProcessedChunk = data.chunk_id || this.lastProcessedChunk + 1;
        
        // Process buffered chunks in order
        this.processBufferedChunks();
        
        this.emit('contentChunk', data);
    }
    
    handleStreamError(data) {
        this.emit('streamError', data);
        
        if (data.recoverable && this.retryCount < this.options.maxRetries) {
            const retryDelay = data.retry_after ? 
                data.retry_after * 1000 : 
                this.calculateRetryDelay();
            
            setTimeout(() => {
                this.attemptRecovery(data);
            }, retryDelay);
        } else {
            this.state = 'failed';
            this.emit('streamFailed', data);
        }
    }
    
    async attemptRecovery(errorData) {
        this.emit('recoveryAttempt', { retryCount: this.retryCount });
        
        if (errorData.error_type === 'connection_closed') {
            // Reconnect and resume stream
            await this.connect();
            if (this.state === 'connected') {
                await this.resumeStream();
            }
        } else if (errorData.error_type === 'rate_limit') {
            // Wait and retry the original request
            await this.retryCurrentRequest();
        } else {
            // Generic recovery attempt
            await this.connect();
        }
    }
    
    async resumeStream() {
        // Request to resume from last processed chunk
        this.send({
            type: 'resume_stream',
            last_chunk_id: this.lastProcessedChunk,
            stream_metadata: this.streamMetadata
        });
    }
    
    handleConnectionClose(event) {
        this.stopHeartbeat();
        
        if (this.state === 'connected') {
            this.state = 'disconnected';
            this.emit('connectionLost', { code: event.code, reason: event.reason });
            
            // Attempt automatic reconnection
            this.scheduleReconnect();
        }
    }
    
    scheduleReconnect() {
        if (this.retryCount >= this.options.maxRetries) {
            this.state = 'failed';
            this.emit('maxRetriesExceeded');
            return;
        }
        
        const delay = this.calculateRetryDelay();
        this.retryCount++;
        
        this.reconnectTimer = setTimeout(() => {
            this.connect();
        }, delay);
    }
    
    calculateRetryDelay() {
        const delay = Math.min(
            this.options.initialRetryDelay * Math.pow(2, this.retryCount),
            this.options.maxRetryDelay
        );
        
        // Add jitter
        return delay + (Math.random() * delay * 0.1);
    }
    
    startHeartbeat() {
        this.heartbeatTimer = setInterval(() => {
            if (this.state === 'connected') {
                this.send({ type: 'heartbeat', timestamp: Date.now() });
            }
        }, this.options.heartbeatInterval);
    }
    
    stopHeartbeat() {
        if (this.heartbeatTimer) {
            clearInterval(this.heartbeatTimer);
            this.heartbeatTimer = null;
        }
    }
    
    send(data) {
        if (this.state !== 'connected' || !this.websocket) {
            throw new Error('Not connected');
        }
        
        this.websocket.send(JSON.stringify(data));
    }
    
    // Event system
    on(event, handler) {
        if (!this.eventHandlers.has(event)) {
            this.eventHandlers.set(event, new Set());
        }
        this.eventHandlers.get(event).add(handler);
    }
    
    emit(event, data) {
        const handlers = this.eventHandlers.get(event);
        if (handlers) {
            handlers.forEach(handler => {
                try {
                    handler(data);
                } catch (error) {
                    console.error(`Error in event handler for ${event}:`, error);
                }
            });
        }
    }
    
    disconnect() {
        if (this.reconnectTimer) {
            clearTimeout(this.reconnectTimer);
        }
        this.stopHeartbeat();
        
        if (this.websocket) {
            this.websocket.close();
        }
        
        this.state = 'disconnected';
        this.emit('disconnected');
    }
}

Performance Optimization and Scaling

Production streaming systems must handle hundreds or thousands of concurrent streams while maintaining low latency and high throughput. This requires careful attention to performance bottlenecks and scaling patterns.

High-Performance Server Architecture

import asyncio
import uvloop
import json
import time
from collections import defaultdict
import weakref
import psutil
import logging
from typing import Dict, List, Set
from dataclasses import dataclass, field

@dataclass
class StreamMetrics:
    total_streams: int = 0
    active_streams: int = 0
    bytes_sent: int = 0
    chunks_sent: int = 0
    avg_chunk_size: float = 0.0
    peak_concurrent_streams: int = 0
    error_count: int = 0
    start_time: float = field(default_factory=time.time)

class HighPerformanceStreamServer:
    def __init__(self, max_concurrent_streams: int = 1000):
        self.max_concurrent_streams = max_concurrent_streams
        self.active_streams: Dict[str, 'StreamContext'] = {}
        self.connection_pool = weakref.WeakSet()
        self.metrics = StreamMetrics()
        
        # Performance optimization settings
        self.chunk_batch_size = 10
        self.send_queue_size = 1000
        self.memory_pressure_threshold = 0.85
        
        # Load balancing
        self.worker_pools = {}
        self.current_worker = 0
        
        # Initialize event loop with optimizations
        asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
        
    async def start_optimized_server(self, host: str = "localhost", port: int = 8765):
        """Start server with performance optimizations."""
        
        # Configure asyncio for high performance
        loop = asyncio.get_event_loop()
        loop.set_debug(False)  # Disable debug mode for production
        
        # Create optimized WebSocket server
        server = await websockets.serve(
            self.handle_connection,
            host,
            port,
            # Performance optimizations
            max_size=10**7,  # 10MB max message size
            max_queue=100,   # Limit queued messages
            compression=None,  # Disable compression for speed
            ping_interval=30,
            ping_timeout=10,
            # Use SO_REUSEPORT for better load distribution
            reuse_port=True
        )
        
        # Start background tasks
        asyncio.create_task(self.metrics_collector())
        asyncio.create_task(self.memory_monitor())
        asyncio.create_task(self.connection_cleaner())
        
        logging.info(f"High-performance streaming server started on {host}:{port}")
        return server
    
    async def handle_connection(self, websocket, path):
        """Optimized connection handler with resource management."""
        
        # Check if we're at capacity
        if len(self.active_streams) >= self.max_concurrent_streams:
            await websocket.close(code=1013, reason="Server at capacity")
            return
            
        connection_id = self.generate_connection_id()
        stream_context = StreamContext(connection_id, websocket)
        
        try:
            self.active_streams[connection_id] = stream_context
            self.connection_pool.add(websocket)
            self.metrics.active_streams += 1
            self.metrics.peak_concurrent_streams = max(
                self.metrics.peak_concurrent_streams,
                self.metrics.active_streams
            )
            
            await self.process_stream_optimized(stream_context)
            
        except Exception as e:
            self.metrics.error_count += 1
            logging.error(f"Stream error for {connection_id}: {e}")
        finally:
            self.cleanup_stream(connection_id)
    
    async def process_stream_optimized(self, context: 'StreamContext'):
        """Optimized streaming with batching and memory management."""
        
        send_queue = asyncio.Queue(maxsize=self.send_queue_size)
        
        async def optimized_sender():
            """Batched sender to reduce syscalls."""
            batch = []
            
            while True:
                try:
                    # Collect batch of chunks
                    while len(batch) < self.chunk_batch_size:
                        try:
                            chunk = await asyncio.wait_for(
                                send_queue.get(), timeout=0.1
                            )
                            if chunk is None:  # Completion signal
                                break
                            batch.append(chunk)
                        except asyncio.TimeoutError:
                            break
                    
                    if not batch:
                        continue
                        
                    # Send batch efficiently
                    if len(batch) == 1:
                        # Single chunk - send directly
                        await context.websocket.send(batch[0])
                    else:
                        # Multiple chunks - batch into single message
                        batched_message = json.dumps({
                            "type": "chunk_batch",
                            "chunks": [json.loads(msg) for msg in batch]
                        })
                        await context.websocket.send(batched_message)
                    
                    # Update metrics
                    for chunk in batch:
                        self.metrics.bytes_sent += len(chunk)
                        self.metrics.chunks_sent += 1
                    
                    batch.clear()
                    
                    if chunk is None:  # Was completion signal
                        break
                        
                except websockets.exceptions.ConnectionClosed:
                    break
                except Exception as e:
                    logging.error(f"Sender error: {e}")
                    break
        
        async def llm_processor():
            """Process LLM stream with memory-efficient handling."""
            try:
                # Get LLM response stream
                llm_stream = await self.get_llm_stream(context.request_data)
                
                chunk_buffer = []
                buffer_size = 0
                max_buffer_size = 8192  # 8KB buffer
                
                async for raw_chunk in llm_stream:
                    chunk_data = self.process_chunk(raw_chunk)
                    chunk_json = json.dumps(chunk_data)
                    
                    chunk_buffer.append(chunk_json)
                    buffer_size += len(chunk_json)
                    
                    # Flush buffer when size limit reached
                    if buffer_size >= max_buffer_size:
                        for buffered_chunk in chunk_buffer:
                            await send_queue.put(buffered_chunk)
                        chunk_buffer.clear()
                        buffer_size = 0
                        
                        # Check memory pressure
                        if self.is_memory_pressure():
                            await asyncio.sleep(0.01)  # Brief pause
                
                # Flush remaining buffer
                for buffered_chunk in chunk_buffer:
                    await send_queue.put(buffered_chunk)
                
                # Signal completion
                await send_queue.put(None)
                
            except Exception as e:
                logging.error(f"LLM processing error: {e}")
                await send_queue.put(json.dumps({
                    "type": "error",
                    "message": str(e)
                }))
        
        # Run sender and processor concurrently
        sender_task = asyncio.create_task(optimized_sender())
        processor_task = asyncio.create_task(llm_processor())
        
        await asyncio.gather(sender_task, processor_task, return_exceptions=True)
    
    def is_memory_pressure(self) -> bool:
        """Check if system is under memory pressure."""
        memory_percent = psutil.virtual_memory().percent / 100
        return memory_percent > self.memory_pressure_threshold
    
    async def memory_monitor(self):
        """Background task to monitor and manage memory usage."""
        while True:
            try:
                if self.is_memory_pressure():
                    # Implement memory pressure relief
                    await self.handle_memory_pressure()
                
                await asyncio.sleep(5)  # Check every 5 seconds
            except Exception as e:
                logging.error(f"Memory monitor error: {e}")
    
    async def handle_memory_pressure(self):
        """Handle memory pressure by throttling or dropping connections."""
        logging.warning("Memory pressure detected, implementing relief measures")
        
        # Sort streams by age, close oldest ones
        sorted_streams = sorted(
            self.active_streams.items(),
            key=lambda x: x[1].start_time
        )
        
        # Close 10% of oldest streams
        to_close = max(1, len(sorted_streams) // 10)
        
        for connection_id, context in sorted_streams[:to_close]:
            try:
                await context.websocket.close(
                    code=1000, 
                    reason="Server memory pressure"
                )
                logging.info(f"Closed stream {connection_id} due to memory pressure")
            except:
                pass  # Connection might already be closed
    
    async def metrics_collector(self):
        """Collect and log performance metrics."""
        while True:
            try:
                current_time = time.time()
                uptime = current_time - self.metrics.start_time
                
                if self.metrics.chunks_sent > 0:
                    self.metrics.avg_chunk_size = (
                        self.metrics.bytes_sent / self.metrics.chunks_sent
                    )
                
                # Log metrics every minute
                logging.info(f"Streaming Metrics - "
                           f"Active: {self.metrics.active_streams}, "
                           f"Peak: {self.metrics.peak_concurrent_streams}, "
                           f"Total Chunks: {self.metrics.chunks_sent}, "
                           f"Avg Size: {self.metrics.avg_chunk_size:.1f}B, "
                           f"Errors: {self.metrics.error_count}, "
                           f"Uptime: {uptime:.1f}s")
                
                await asyncio.sleep(60)
            except Exception as e:
                logging.error(f"Metrics collector error: {e}")

@dataclass
class StreamContext:
    connection_id: str
    websocket: websockets.WebSocketServerProtocol
    start_time: float = field(default_factory=time.time)
    request_data: dict = field(default_factory=dict)
    bytes_sent: int = 0
    chunks_sent: int = 0

Client-Side Performance Optimization

class OptimizedStreamingClient {
    constructor(options = {}) {
        this.options = {
            bufferSize: 50,
            renderBatchSize: 5,
            maxRenderFreq: 60, // Max FPS for rendering
            enableVirtualScrolling: true,
            ...options
        };
        
        this.renderQueue = [];
        this.isRendering = false;
        this.lastRenderTime = 0;
        this.virtualScrollOffset = 0;
        
        // Performance monitoring
        this.perfMetrics = {
            chunksReceived: 0,
            chunksRendered: 0,
            avgRenderTime: 0,
            droppedFrames: 0
        };
        
        // Use requestAnimationFrame for smooth rendering
        this.frameId = null;
    }
    
    handleContentChunk(chunk) {
        this.perfMetrics.chunksReceived++;
        
        // Add to render queue
        this.renderQueue.push(chunk);
        
        // Limit queue size to prevent memory issues
        if (this.renderQueue.length > this.options.bufferSize) {
            this.renderQueue = this.renderQueue.slice(-this.options.bufferSize);
            this.perfMetrics.droppedFrames++;
        }
        
        // Schedule rendering if not already scheduled
        this.scheduleRender();
    }
    
    scheduleRender() {
        if (this.frameId) {
            return; // Already scheduled
        }
        
        this.frameId = requestAnimationFrame(() => {
            this.performBatchRender();
            this.frameId = null;
        });
    }
    
    performBatchRender() {
        const startTime = performance.now();
        const targetFrameTime = 1000 / this.options.maxRenderFreq;
        
        // Check if enough time has passed since last render
        if (startTime - this.lastRenderTime < targetFrameTime) {
            // Schedule for next frame
            this.scheduleRender();
            return;
        }
        
        try {
            // Process batch of chunks
            const batchSize = Math.min(
                this.options.renderBatchSize,
                this.renderQueue.length
            );
            
            const batch = this.renderQueue.splice(0, batchSize);
            
            if (batch.length > 0) {
                this.renderBatch(batch);
                this.perfMetrics.chunksRendered += batch.length;
            }
            
            // Continue if more chunks waiting
            if (this.renderQueue.length > 0) {
                this.scheduleRender();
            }
            
        } finally {
            const endTime = performance.now();
            const renderTime = endTime - startTime;
            
            // Update performance metrics
            this.perfMetrics.avgRenderTime = (
                (this.perfMetrics.avgRenderTime * 0.9) + (renderTime * 0.1)
            );
            
            this.lastRenderTime = endTime;
        }
    }
    
    renderBatch(chunks) {
        // Efficient DOM manipulation using document fragments
        const fragment = document.createDocumentFragment();
        
        for (const chunk of chunks) {
            const element = this.createChunkElement(chunk);
            fragment.appendChild(element);
        }
        
        // Single DOM update
        this.container.appendChild(fragment);
        
        // Update virtual scrolling if enabled
        if (this.options.enableVirtualScrolling) {
            this.updateVirtualScrolling();
        }
        
        // Trigger syntax highlighting in batches
        this.scheduleHighlighting();
    }
    
    updateVirtualScrolling() {
        const containerHeight = this.container.clientHeight;
        const scrollTop = this.container.scrollTop;
        const itemHeight = 20; // Approximate line height
        
        const visibleStart = Math.floor(scrollTop / itemHeight);
        const visibleEnd = visibleStart + Math.ceil(containerHeight / itemHeight);
        
        // Hide elements outside visible range
        const children = this.container.children;
        for (let i = 0; i < children.length; i++) {
            const child = children[i];
            if (i < visibleStart || i > visibleEnd) {
                child.style.display = 'none';
            } else {
                child.style.display = '';
            }
        }
    }
    
    scheduleHighlighting() {
        // Debounced syntax highlighting to avoid performance issues
        clearTimeout(this.highlightTimeout);
        this.highlightTimeout = setTimeout(() => {
            this.performHighlighting();
        }, 100);
    }
    
    performHighlighting() {
        // Use Intersection Observer for efficient highlighting
        if (!this.highlightObserver) {
            this.highlightObserver = new IntersectionObserver((entries) => {
                entries.forEach(entry => {
                    if (entry.isIntersecting && !entry.target.highlighted) {
                        this.highlightElement(entry.target);
                        entry.target.highlighted = true;
                    }
                });
            }, { threshold: 0.1 });
        }
        
        // Observe code blocks for highlighting
        const codeBlocks = this.container.querySelectorAll('pre:not([highlighted])');
        codeBlocks.forEach(block => {
            this.highlightObserver.observe(block);
        });
    }
    
    highlightElement(element) {
        // Async syntax highlighting to avoid blocking
        setTimeout(() => {
            if (typeof Prism !== 'undefined') {
                Prism.highlightElement(element);
            }
        }, 0);
    }
    
    // Performance monitoring
    getPerformanceMetrics() {
        return {
            ...this.perfMetrics,
            queueSize: this.renderQueue.length,
            memoryUsage: this.estimateMemoryUsage()
        };
    }
    
    estimateMem

Learning Path: Building with LLMs

Previous

Building Streaming AI Interfaces: Real-Time Response Delivery

Related Articles

AI & Machine Learning⚡ Practitioner

Building Streaming AI Interfaces: Real-Time Response Delivery

24 min
AI & Machine Learning🌱 Foundation

Building a Document Q&A System with Embeddings: A Complete Beginner's Guide

16 min
AI & Machine Learning🔥 Expert

Building a Production Document Q&A System with Vector Embeddings

25 min

On this page

  • Prerequisites
  • The Anatomy of Streaming AI Responses
  • Server-Side Streaming Architecture
  • Server-Sent Events (SSE) Implementation
  • Advanced WebSocket Implementation
  • Client-Side Rendering and State Management
  • Progressive Text Rendering
  • React Integration with Streaming State
  • Backpressure and Flow Control
  • Server-Side Backpressure Handling
  • Client-Side Recovery and Reconnection
  • Performance Optimization and Scaling
  • High-Performance Server Architecture
  • Client-Side Performance Optimization
  • Client-Side Flow Control
  • Error Handling and Recovery
  • Comprehensive Error Handling Strategy
  • Client-Side Recovery and Reconnection
  • Performance Optimization and Scaling
  • High-Performance Server Architecture
  • Client-Side Performance Optimization