
You're building a customer service chatbot that needs to feel conversational and responsive. Users type a question, hit send, and... wait. Ten seconds pass. Fifteen. Finally, a complete response appears all at once. The experience feels clunky, artificial, and frankly, broken.
Now imagine the same interaction, but as soon as the user hits send, words begin appearing one by one, just like a human typing in real-time. The user stays engaged, can start reading immediately, and gets that satisfying sense of a living conversation. This is the power of streaming responses in AI interfaces.
Building streaming AI interfaces isn't just about better user experience—it's about handling the fundamental challenge of large language models: they're computationally expensive and can take significant time to generate complete responses. Rather than making users wait for the entire process to complete, streaming lets you deliver value incrementally while the AI continues working.
What you'll learn:
You should be comfortable with:
Traditional AI API calls work like ordering at a restaurant: you place your order, wait, and receive your complete meal all at once. Streaming is more like watching a chef cook in an open kitchen—you see each ingredient added, each step completed, building toward the final dish.
Most modern LLM APIs support Server-Sent Events (SSE) for streaming. Instead of waiting for the complete response, the model sends individual tokens (pieces of words or complete words) as they're generated. Your application receives these tokens in real-time and can display them immediately.
Here's how a basic streaming request differs from a standard one:
import openai
import json
# Standard (non-streaming) request
def get_complete_response(prompt):
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=500
)
return response.choices[0].message.content
# Streaming request
def get_streaming_response(prompt):
stream = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=500,
stream=True # This enables streaming
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
The streaming version returns a generator that yields individual content pieces. Each chunk contains a small fragment of the complete response—sometimes a full word, sometimes part of a word, depending on the tokenization.
Let's build a practical streaming chat interface step by step. We'll start with a Python backend using Flask and create a frontend that displays tokens as they arrive.
First, create a Flask application that handles streaming requests:
from flask import Flask, request, Response, render_template
import openai
import json
import os
from datetime import datetime
app = Flask(__name__)
openai.api_key = os.getenv('OPENAI_API_KEY')
@app.route('/')
def index():
return render_template('chat.html')
@app.route('/stream-chat', methods=['POST'])
def stream_chat():
user_message = request.json.get('message')
conversation_history = request.json.get('history', [])
# Build the complete conversation context
messages = conversation_history + [
{"role": "user", "content": user_message}
]
def generate_response():
try:
stream = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=1000,
temperature=0.7,
stream=True
)
complete_response = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
complete_response += token
# Send each token as a Server-Sent Event
yield f"data: {json.dumps({'token': token, 'type': 'token'})}\n\n"
# Send completion signal
yield f"data: {json.dumps({'type': 'complete', 'full_response': complete_response})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n"
return Response(
generate_response(),
mimetype='text/plain',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'text/event-stream'
}
)
if __name__ == '__main__':
app.run(debug=True)
Create a responsive HTML interface that handles the streaming data:
<!DOCTYPE html>
<html>
<head>
<title>Streaming AI Chat</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
background-color: #f5f5f5;
}
.chat-container {
background: white;
border-radius: 10px;
padding: 20px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.messages {
height: 400px;
overflow-y: auto;
border: 1px solid #ddd;
padding: 15px;
margin-bottom: 20px;
background-color: #fafafa;
border-radius: 8px;
}
.message {
margin-bottom: 15px;
padding: 10px;
border-radius: 8px;
}
.user-message {
background-color: #007bff;
color: white;
margin-left: 50px;
}
.ai-message {
background-color: #e9ecef;
color: #333;
margin-right: 50px;
}
.input-container {
display: flex;
gap: 10px;
}
.input-container input {
flex: 1;
padding: 10px;
border: 1px solid #ddd;
border-radius: 5px;
font-size: 16px;
}
.input-container button {
padding: 10px 20px;
background-color: #007bff;
color: white;
border: none;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
}
.input-container button:disabled {
background-color: #6c757d;
cursor: not-allowed;
}
.typing-indicator {
font-style: italic;
color: #666;
margin-top: 5px;
}
.cursor {
animation: blink 1s infinite;
}
@keyframes blink {
0%, 50% { opacity: 1; }
51%, 100% { opacity: 0; }
}
</style>
</head>
<body>
<div class="chat-container">
<h1>Streaming AI Assistant</h1>
<div class="messages" id="messages"></div>
<div class="input-container">
<input type="text" id="messageInput" placeholder="Type your message...">
<button id="sendButton" onclick="sendMessage()">Send</button>
</div>
</div>
<script>
let conversationHistory = [];
let currentEventSource = null;
let currentMessageElement = null;
function addMessage(content, isUser = false) {
const messagesContainer = document.getElementById('messages');
const messageDiv = document.createElement('div');
messageDiv.className = `message ${isUser ? 'user-message' : 'ai-message'}`;
messageDiv.textContent = content;
messagesContainer.appendChild(messageDiv);
messagesContainer.scrollTop = messagesContainer.scrollHeight;
return messageDiv;
}
function sendMessage() {
const input = document.getElementById('messageInput');
const sendButton = document.getElementById('sendButton');
const message = input.value.trim();
if (!message) return;
// Add user message to UI and history
addMessage(message, true);
conversationHistory.push({"role": "user", "content": message});
// Clear input and disable send button
input.value = '';
sendButton.disabled = true;
// Create AI message container
currentMessageElement = addMessage('', false);
currentMessageElement.innerHTML = '<span class="typing-indicator">AI is typing<span class="cursor">|</span></span>';
// Start streaming response
startStreaming(message);
}
function startStreaming(message) {
const eventSource = new EventSource('/stream-chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
message: message,
history: conversationHistory
})
});
// Note: EventSource doesn't support POST directly
// We need to use fetch with streaming instead
fetch('/stream-chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
message: message,
history: conversationHistory
})
})
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
let aiResponse = '';
function readChunk() {
return reader.read().then(({ done, value }) => {
if (done) {
finishResponse(aiResponse);
return;
}
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (let line of lines) {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6));
if (data.type === 'token') {
aiResponse += data.token;
currentMessageElement.textContent = aiResponse;
// Add blinking cursor
currentMessageElement.innerHTML = aiResponse + '<span class="cursor">|</span>';
} else if (data.type === 'complete') {
finishResponse(data.full_response);
return;
} else if (data.type === 'error') {
handleError(data.error);
return;
}
} catch (e) {
console.error('Error parsing SSE data:', e);
}
}
}
return readChunk();
});
}
return readChunk();
})
.catch(error => {
handleError(error.message);
});
}
function finishResponse(fullResponse) {
// Remove typing indicator and cursor
currentMessageElement.textContent = fullResponse;
// Add to conversation history
conversationHistory.push({"role": "assistant", "content": fullResponse});
// Re-enable send button
document.getElementById('sendButton').disabled = false;
document.getElementById('messageInput').focus();
}
function handleError(errorMessage) {
currentMessageElement.textContent = `Error: ${errorMessage}`;
currentMessageElement.style.color = 'red';
document.getElementById('sendButton').disabled = false;
}
// Handle Enter key in input
document.getElementById('messageInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
// Focus input on page load
window.onload = function() {
document.getElementById('messageInput').focus();
};
</script>
</body>
</html>
This implementation creates a complete streaming chat interface where tokens appear in real-time as the AI generates them. The typing indicator and blinking cursor provide clear visual feedback about the system's state.
Real-world streaming applications need robust connection handling. Network interruptions, timeouts, and API rate limits can break streams unexpectedly. Here's an enhanced version that handles these challenges:
import time
import threading
from flask import Flask, request, Response, jsonify
import openai
import json
import uuid
class StreamingManager:
def __init__(self):
self.active_streams = {}
self.stream_timeouts = {}
def create_stream(self, stream_id, messages, max_tokens=1000):
def generate_with_recovery():
try:
start_time = time.time()
stream = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=max_tokens,
temperature=0.7,
stream=True,
timeout=30 # API timeout
)
complete_response = ""
last_token_time = time.time()
for chunk in stream:
# Check if stream was cancelled
if stream_id not in self.active_streams:
yield f"data: {json.dumps({'type': 'cancelled'})}\n\n"
return
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
complete_response += token
last_token_time = time.time()
yield f"data: {json.dumps({
'token': token,
'type': 'token',
'stream_id': stream_id,
'timestamp': last_token_time
})}\n\n"
# Check for timeout (no tokens for 30 seconds)
if time.time() - last_token_time > 30:
yield f"data: {json.dumps({'type': 'timeout'})}\n\n"
return
# Stream completed successfully
total_time = time.time() - start_time
yield f"data: {json.dumps({
'type': 'complete',
'full_response': complete_response,
'duration': total_time,
'token_count': len(complete_response.split())
})}\n\n"
except Exception as e:
yield f"data: {json.dumps({
'type': 'error',
'error': str(e),
'error_type': type(e).__name__
})}\n\n"
finally:
# Clean up
if stream_id in self.active_streams:
del self.active_streams[stream_id]
if stream_id in self.stream_timeouts:
del self.stream_timeouts[stream_id]
self.active_streams[stream_id] = True
return generate_with_recovery()
def cancel_stream(self, stream_id):
if stream_id in self.active_streams:
del self.active_streams[stream_id]
return True
return False
streaming_manager = StreamingManager()
@app.route('/stream-chat', methods=['POST'])
def stream_chat():
user_message = request.json.get('message')
conversation_history = request.json.get('history', [])
stream_id = str(uuid.uuid4())
messages = conversation_history + [
{"role": "user", "content": user_message}
]
return Response(
streaming_manager.create_stream(stream_id, messages),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Stream-ID': stream_id
}
)
@app.route('/cancel-stream/<stream_id>', methods=['POST'])
def cancel_stream(stream_id):
success = streaming_manager.cancel_stream(stream_id)
return jsonify({'cancelled': success})
Production streaming applications need monitoring to track performance and identify issues:
from collections import defaultdict
import time
import threading
class StreamingMetrics:
def __init__(self):
self.metrics = defaultdict(list)
self.active_streams = 0
self.lock = threading.Lock()
def start_stream(self, stream_id):
with self.lock:
self.active_streams += 1
self.metrics[stream_id] = {
'start_time': time.time(),
'tokens_received': 0,
'total_chars': 0,
'first_token_time': None
}
def record_token(self, stream_id, token):
with self.lock:
if stream_id in self.metrics:
metrics = self.metrics[stream_id]
metrics['tokens_received'] += 1
metrics['total_chars'] += len(token)
if metrics['first_token_time'] is None:
metrics['first_token_time'] = time.time()
def end_stream(self, stream_id):
with self.lock:
self.active_streams -= 1
if stream_id in self.metrics:
metrics = self.metrics[stream_id]
metrics['end_time'] = time.time()
# Calculate performance metrics
total_time = metrics['end_time'] - metrics['start_time']
time_to_first_token = (metrics['first_token_time'] -
metrics['start_time']
if metrics['first_token_time'] else 0)
return {
'total_duration': total_time,
'time_to_first_token': time_to_first_token,
'tokens_per_second': metrics['tokens_received'] / total_time if total_time > 0 else 0,
'chars_per_second': metrics['total_chars'] / total_time if total_time > 0 else 0,
'total_tokens': metrics['tokens_received'],
'total_chars': metrics['total_chars']
}
def get_current_stats(self):
with self.lock:
return {
'active_streams': self.active_streams,
'total_streams': len(self.metrics)
}
metrics = StreamingMetrics()
# Enhanced streaming endpoint with metrics
@app.route('/stream-chat-with-metrics', methods=['POST'])
def stream_chat_with_metrics():
user_message = request.json.get('message')
conversation_history = request.json.get('history', [])
stream_id = str(uuid.uuid4())
messages = conversation_history + [
{"role": "user", "content": user_message}
]
def generate_with_metrics():
metrics.start_stream(stream_id)
try:
stream = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=1000,
stream=True
)
complete_response = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
complete_response += token
# Record metrics
metrics.record_token(stream_id, token)
yield f"data: {json.dumps({
'token': token,
'type': 'token'
})}\n\n"
# End stream and get performance data
performance = metrics.end_stream(stream_id)
yield f"data: {json.dumps({
'type': 'complete',
'full_response': complete_response,
'performance': performance
})}\n\n"
except Exception as e:
metrics.end_stream(stream_id)
yield f"data: {json.dumps({
'type': 'error',
'error': str(e)
})}\n\n"
return Response(
generate_with_metrics(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Stream-ID': stream_id
}
)
@app.route('/metrics')
def get_metrics():
return jsonify(metrics.get_current_stats())
Real production systems often need to work with multiple LLM providers. Here's a unified streaming interface that works with OpenAI, Anthropic, and other providers:
from abc import ABC, abstractmethod
import anthropic
import openai
class StreamingProvider(ABC):
@abstractmethod
def stream_response(self, messages, **kwargs):
pass
class OpenAIStreaming(StreamingProvider):
def __init__(self, api_key):
self.client = openai.OpenAI(api_key=api_key)
def stream_response(self, messages, model="gpt-3.5-turbo", **kwargs):
stream = self.client.chat.completions.create(
model=model,
messages=messages,
stream=True,
**kwargs
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield {
'token': chunk.choices[0].delta.content,
'finish_reason': chunk.choices[0].finish_reason,
'provider': 'openai'
}
class AnthropicStreaming(StreamingProvider):
def __init__(self, api_key):
self.client = anthropic.Anthropic(api_key=api_key)
def stream_response(self, messages, model="claude-3-haiku-20240307", **kwargs):
# Convert OpenAI format to Anthropic format
system_message = ""
formatted_messages = []
for msg in messages:
if msg['role'] == 'system':
system_message = msg['content']
else:
formatted_messages.append(msg)
with self.client.messages.stream(
model=model,
max_tokens=kwargs.get('max_tokens', 1000),
system=system_message,
messages=formatted_messages
) as stream:
for text in stream.text_stream:
yield {
'token': text,
'finish_reason': None,
'provider': 'anthropic'
}
class MultiProviderStreaming:
def __init__(self):
self.providers = {}
def add_provider(self, name, provider):
self.providers[name] = provider
def stream_response(self, provider_name, messages, **kwargs):
if provider_name not in self.providers:
raise ValueError(f"Provider {provider_name} not configured")
provider = self.providers[provider_name]
for token_data in provider.stream_response(messages, **kwargs):
yield token_data
# Initialize multi-provider streaming
multi_streaming = MultiProviderStreaming()
multi_streaming.add_provider('openai', OpenAIStreaming(os.getenv('OPENAI_API_KEY')))
multi_streaming.add_provider('anthropic', AnthropicStreaming(os.getenv('ANTHROPIC_API_KEY')))
@app.route('/multi-stream-chat', methods=['POST'])
def multi_stream_chat():
user_message = request.json.get('message')
provider = request.json.get('provider', 'openai')
conversation_history = request.json.get('history', [])
messages = conversation_history + [
{"role": "user", "content": user_message}
]
def generate_response():
try:
complete_response = ""
for token_data in multi_streaming.stream_response(
provider,
messages,
max_tokens=1000,
temperature=0.7
):
token = token_data['token']
complete_response += token
yield f"data: {json.dumps({
'token': token,
'type': 'token',
'provider': token_data['provider']
})}\n\n"
yield f"data: {json.dumps({
'type': 'complete',
'full_response': complete_response,
'provider': provider
})}\n\n"
except Exception as e:
yield f"data: {json.dumps({
'type': 'error',
'error': str(e),
'provider': provider
})}\n\n"
return Response(
generate_response(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
Let's build a practical application that combines RAG (Retrieval Augmented Generation) with streaming responses. This system will search through uploaded documents and stream AI-generated answers in real-time.
from flask import Flask, request, Response, jsonify
import openai
import json
import os
from datetime import datetime
import chromadb
from sentence_transformers import SentenceTransformer
import PyPDF2
import io
app = Flask(__name__)
class StreamingRAGSystem:
def __init__(self):
self.embeddings_model = SentenceTransformer('all-MiniLM-L6-v2')
self.chroma_client = chromadb.Client()
self.collection = self.chroma_client.create_collection(
name="documents",
get_or_create=True
)
self.openai_client = openai.OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
def process_document(self, file_content, filename):
"""Extract text from uploaded PDF and add to vector database"""
pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_content))
chunks = []
for page_num, page in enumerate(pdf_reader.pages):
text = page.extract_text()
# Split into smaller chunks for better retrieval
sentences = text.split('. ')
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < 500:
current_chunk += sentence + ". "
else:
if current_chunk.strip():
chunks.append({
'text': current_chunk.strip(),
'page': page_num + 1,
'filename': filename
})
current_chunk = sentence + ". "
if current_chunk.strip():
chunks.append({
'text': current_chunk.strip(),
'page': page_num + 1,
'filename': filename
})
# Generate embeddings and add to ChromaDB
texts = [chunk['text'] for chunk in chunks]
embeddings = self.embeddings_model.encode(texts).tolist()
ids = [f"{filename}_{i}" for i in range(len(chunks))]
metadatas = [
{
'page': chunk['page'],
'filename': chunk['filename']
}
for chunk in chunks
]
self.collection.add(
embeddings=embeddings,
documents=texts,
metadatas=metadatas,
ids=ids
)
return len(chunks)
def retrieve_relevant_chunks(self, query, n_results=5):
"""Find most relevant document chunks for the query"""
query_embedding = self.embeddings_model.encode([query]).tolist()
results = self.collection.query(
query_embeddings=query_embedding,
n_results=n_results
)
return results
def stream_rag_response(self, query):
"""Generate streaming response using retrieved documents"""
# First, retrieve relevant documents
yield f"data: {json.dumps({'type': 'status', 'message': 'Searching documents...'})}\n\n"
retrieval_results = self.retrieve_relevant_chunks(query)
if not retrieval_results['documents'][0]:
yield f"data: {json.dumps({'type': 'error', 'error': 'No relevant documents found'})}\n\n"
return
# Build context from retrieved documents
context_docs = []
for i, doc in enumerate(retrieval_results['documents'][0]):
metadata = retrieval_results['metadatas'][0][i]
context_docs.append(f"[{metadata['filename']}, Page {metadata['page']}]: {doc}")
context = "\n\n".join(context_docs)
yield f"data: {json.dumps({'type': 'sources', 'sources': retrieval_results['metadatas'][0]})}\n\n"
yield f"data: {json.dumps({'type': 'status', 'message': 'Generating answer...'})}\n\n"
# Create prompt with context
prompt = f"""Based on the following documents, answer the user's question. If the answer isn't in the documents, say so clearly.
Context from documents:
{context}
User question: {query}
Answer:"""
messages = [{"role": "user", "content": prompt}]
try:
stream = self.openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
max_tokens=1000,
temperature=0.3,
stream=True
)
complete_response = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
complete_response += token
yield f"data: {json.dumps({'token': token, 'type': 'token'})}\n\n"
yield f"data: {json.dumps({
'type': 'complete',
'full_response': complete_response,
'sources_used': len(retrieval_results['documents'][0])
})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n"
rag_system = StreamingRAGSystem()
@app.route('/')
def index():
return render_template('rag_chat.html')
@app.route('/upload-document', methods=['POST'])
def upload_document():
if 'file' not in request.files:
return jsonify({'error': 'No file uploaded'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
if not file.filename.lower().endswith('.pdf'):
return jsonify({'error': 'Only PDF files are supported'}), 400
try:
file_content = file.read()
chunks_added = rag_system.process_document(file_content, file.filename)
return jsonify({
'message': f'Document processed successfully',
'filename': file.filename,
'chunks_added': chunks_added
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/stream-rag-query', methods=['POST'])
def stream_rag_query():
query = request.json.get('query')
if not query:
return jsonify({'error': 'No query provided'}), 400
return Response(
rag_system.stream_rag_response(query),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
if __name__ == '__main__':
app.run(debug=True)
<!DOCTYPE html>
<html>
<head>
<title>Streaming RAG Document Q&A</title>
<style>
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
max-width: 1000px;
margin: 0 auto;
padding: 20px;
background-color: #f8f9fa;
}
.container {
background: white;
border-radius: 12px;
box-shadow: 0 4px 15px rgba(0,0,0,0.1);
overflow: hidden;
}
.header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 20px;
text-align: center;
}
.upload-section {
padding: 20px;
border-bottom: 1px solid #e9ecef;
background-color: #f8f9fa;
}
.upload-area {
border: 2px dashed #dee2e6;
border-radius: 8px;
padding: 20px;
text-align: center;
cursor: pointer;
transition: all 0.3s ease;
}
.upload-area:hover {
border-color: #667eea;
background-color: #f0f4ff;
}
.upload-area.drag-over {
border-color: #667eea;
background-color: #e6f0ff;
}
.messages {
height: 500px;
overflow-y: auto;
padding: 20px;
}
.message {
margin-bottom: 20px;
padding: 15px;
border-radius: 12px;
max-width: 80%;
}
.user-message {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
margin-left: auto;
}
.ai-message {
background-color: #f8f9fa;
border: 1px solid #e9ecef;
}
.status-message {
background-color: #fff3cd;
border: 1px solid #ffeaa7;
color: #856404;
text-align: center;
font-style: italic;
}
.sources {
background-color: #e7f3ff;
border: 1px solid #b3d9ff;
color: #0056b3;
margin-bottom: 10px;
}
.sources h4 {
margin: 0 0 10px 0;
color: #0056b3;
}
.source-item {
background: white;
padding: 8px;
margin: 5px 0;
border-radius: 4px;
font-size: 0.9em;
}
.input-section {
padding: 20px;
background-color: #f8f9fa;
border-top: 1px solid #e9ecef;
}
.input-container {
display: flex;
gap: 10px;
}
.input-container input {
flex: 1;
padding: 12px;
border: 1px solid #dee2e6;
border-radius: 8px;
font-size: 16px;
}
.input-container input:focus {
outline: none;
border-color: #667eea;
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1);
}
.btn {
padding: 12px 24px;
border: none;
border-radius: 8px;
cursor: pointer;
font-size: 16px;
font-weight: 500;
transition: all 0.3s ease;
}
.btn-primary {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
}
.btn-primary:hover {
transform: translateY(-2px);
box-shadow: 0 4px 12px rgba(102, 126, 234, 0.4);
}
.btn:disabled {
opacity: 0.6;
cursor: not-allowed;
transform: none;
}
.cursor {
animation: blink 1s infinite;
}
@keyframes blink {
0%, 50% { opacity: 1; }
51%, 100% { opacity: 0; }
}
.uploaded-files {
margin-top: 15px;
}
.file-item {
background: white;
padding: 10px;
margin: 5px 0;
border-radius: 6px;
border: 1px solid #e9ecef;
display: flex;
justify-content: space-between;
align-items: center;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>📚 Streaming RAG Document Q&A</h1>
<p>Upload documents and ask questions with real-time AI responses</p>
</div>
<div class="upload-section">
<div class="upload-area" id="uploadArea">
<h3>📎 Upload PDF Documents</h3>
<p>Drag and drop PDF files here or click to select</p>
<input type="file" id="fileInput" accept=".pdf" multiple style="display: none;">
<button class="btn btn-primary" onclick="document.getElementById('fileInput').click()">
Choose Files
</button>
</div>
<div class="uploaded-files" id="uploadedFiles"></div>
</div>
<div class="messages" id="messages"></div>
<div class="input-section">
<div class="input-container">
<input type="text" id="queryInput" placeholder="Ask a question about your documents...">
<button class="btn btn-primary" id="askButton" onclick="askQuestion()">Ask</button>
</div>
</div>
</div>
<script>
let currentMessageElement = null;
let uploadedFiles = [];
// File upload handling
document.getElementById('fileInput').addEventListener('change', handleFileUpload);
const uploadArea = document.getElementById('uploadArea');
uploadArea.addEventListener('dragover', handleDragOver);
uploadArea.addEventListener('dragleave', handleDragLeave);
uploadArea.addEventListener('drop', handleDrop);
function handleDragOver(e) {
e.preventDefault();
uploadArea.classList.add('drag-over');
}
function handleDragLeave(e) {
e.preventDefault();
uploadArea.classList.remove('drag-over');
}
function handleDrop(e) {
e.preventDefault();
uploadArea.classList.remove('drag-over');
const files = e.dataTransfer.files;
processFiles(files);
}
function handleFileUpload(e) {
const files = e.target.files;
processFiles(files);
}
function processFiles(files) {
for (let file of files) {
if (file.type === 'application/pdf') {
uploadFile(file);
} else {
addMessage('Only PDF files are supported', false, 'status-message');
}
}
}
function uploadFile(file) {
const formData = new FormData();
formData.append('file', file);
addMessage(`Uploading ${file.name}...`, false, 'status-message');
fetch('/upload-document', {
method: 'POST',
body: formData
})
.then(response => response.json())
.then(data => {
if (data.error) {
addMessage(`Error uploading ${file.name}: ${data.error}`, false, 'status-message');
} else {
uploadedFiles.push(data);
addMessage(`✅ ${data.filename} uploaded successfully (${data.chunks_added} chunks processed)`, false, 'status-message');
updateUploadedFilesList();
}
})
.catch(error => {
addMessage(`Error uploading ${file.name}: ${error.message}`, false, 'status-message');
});
}
function updateUploadedFilesList() {
const container = document.getElementById('uploadedFiles');
container.innerHTML = '<h4>📄 Uploaded Documents:</h4>';
uploadedFiles.forEach(file => {
const fileDiv = document.createElement('div');
fileDiv.className = 'file-item';
fileDiv.innerHTML = `
<span>${file.filename}</span>
<span>${file.chunks_added} chunks</span>
`;
container.appendChild(fileDiv);
});
}
function addMessage(content, isUser = false, className = null) {
const messagesContainer = document.getElementById('messages');
const messageDiv = document.createElement('div');
if (className) {
messageDiv.className = `message ${className}`;
} else {
messageDiv.className = `message ${isUser ? 'user-message' : 'ai-message'}`;
}
messageDiv.textContent = content;
messagesContainer.appendChild(messageDiv);
messagesContainer.scrollTop = messagesContainer.scrollHeight;
return messageDiv;
}
function addSourcesMessage(sources) {
const messagesContainer = document.getElementById('messages');
const messageDiv = document.createElement('div');
messageDiv.className = 'message sources';
let sourcesHtml = '<h4>📑 Sources Found:</h4>';
sources.forEach(source => {
sourcesHtml += `<div class="source-item">${source.filename}, Page ${source.page}</div>`;
});
messageDiv.innerHTML = sourcesHtml;
messagesContainer.appendChild(messageDiv);
messagesContainer.scrollTop = messagesContainer.scrollHeight;
return messageDiv;
}
function askQuestion() {
const input = document.getElementById('queryInput');
const askButton = document.getElementById('askButton');
const query = input.value.trim();
if (!query) return;
if (uploadedFiles.length === 0) {
addMessage('Please upload at least one PDF document first.', false, 'status-message');
return;
}
// Add user question to UI
addMessage(query, true);
// Clear input and disable button
input.value = '';
askButton.disabled = true;
// Start streaming response
startRAGStreaming(query);
}
function startRAGStreaming(query) {
fetch('/stream-rag-query', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ query: query })
})
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
let aiResponse = '';
let sourcesShown = false;
function readChunk() {
return reader.read().then(({ done, value }) => {
if (done) {
finishResponse(aiResponse);
return;
}
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (let line of lines) {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6));
if (data.type === 'status') {
if (currentMessageElement) {
currentMessageElement.textContent = data.message;
} else {
currentMessageElement = addMessage(data.message, false, 'status-message');
}
} else if (data.type === 'sources' && !sourcesShown) {
addSourcesMessage(data.sources);
sourcesShown = true;
currentMessageElement = addMessage('', false);
} else if (data.type === 'token') {
if (!currentMessageElement || currentMessageElement.classList.contains('status-message')) {
currentMessageElement = addMessage('', false);
}
aiResponse += data.token;
currentMessageElement.innerHTML = aiResponse + '<span class="cursor">|</span>';
} else if (data.type === 'complete') {
finishResponse(data.full_response);
return;
} else if (data.type === 'error') {
handleError(data.error);
return;
}
} catch (e) {
console.error('Error parsing SSE data:', e);
}
}
}
return readChunk();
});
}
return readChunk();
})
.catch(error => {
handleError(error.message);
});
}
function finishResponse(fullResponse) {
if (currentMessageElement) {
currentMessageElement.textContent = fullResponse;
}
document.getElementById('askButton').disabled = false;
document.getElementById('queryInput').focus();
}
function handleError(errorMessage) {
addMessage(`❌ Error: ${errorMessage}`, false, 'status-message');
document.getElementById('askButton').disabled = false;
}
// Handle Enter key in input
document.getElementById('queryInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
askQuestion();
}
});
// Focus input on page load
window.onload = function() {
document.getElementById('queryInput').focus();
};
</script>
</body>
</html>
This complete RAG implementation provides a production-ready example of streaming AI interfaces with document retrieval, source citations, and real-time performance feedback.
One of the most frequent problems in streaming implementations is improper buffer handling. Tokens can arrive faster than your UI can process them, leading to lag or dropped content.
Problem: UI freezes when processing rapid token streams
// Problematic approach - processing every token immediately
for (let token of tokenStream) {
document.getElementById('output').innerHTML += token;
// This can cause performance issues with rapid updates
}
Solution: Implement token buffering and batched updates
class TokenBuffer {
constructor(updateCallback, batchSize = 5, maxDelay = 50) {
this.buffer = '';
this.updateCallback = updateCallback;
this.batchSize = batchSize;
this.maxDelay = maxDelay;
this.tokenCount = 0;
this.lastUpdate = Date.now();
}
addToken(token) {
this.buffer += token;
this.tokenCount++;
const timeSinceLastUpdate = Date.now() - this.lastUpdate;
// Update if we have enough tokens or enough time has passed
if (this.tokenCount >= this.batchSize || timeSinceLastUpdate >= this.maxDelay) {
this.flush();
}
}
flush() {
if (this.buffer) {
this.updateCallback(this.buffer);
this.buffer = '';
this.tokenCount = 0;
this.lastUpdate = Date.now();
}
}
}
// Usage
const buffer = new TokenBuffer((content) => {
document.getElementById('output').innerHTML = fullResponse + content;
}, 3, 30);
Network interruptions are inevitable in production streaming applications. Implement robust recovery mechanisms:
class StreamingConnection {
constructor(endpoint, maxRetries = 3, retryDelay = 1000) {
this.endpoint = endpoint;
this.maxRetries = maxRetries;
this.retryDelay = retryDelay;
this.currentRetries = 0;
this.isConnected = false;
this.onToken = null;
this.onError = null;
this.onComplete = null;
}
async start(requestData) {
try {
const response = await fetch(this.endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(requestData)
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
this.isConnected = true;
this.currentRetries = 0;
return this.processStream(response);
} catch (error) {
this.handleConnectionError(error, requestData);
}
}
async processStream(response) {
const reader = response.body.getReader();
const decoder = new TextDecoder();
try {
while (this.isConnected) {
const { done, value } = await reader.read();
if (done) {
if (this.onComplete) this.onComplete();
break;
}
const chunk = decoder.decode(value);
this.processChunk(chunk);
}
} catch (error) {
if (this.isConnected) {
this.handleStreamError(error);
}
} finally {
reader.releaseLock();
}
}
processChunk(chunk) {
const lines = chunk.split('\n');
for (let line of lines) {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.slice(6));
if (data.type === 'token' && this.onToken) {
this.onToken(data.token);
} else if (data.type === 'error' && this.onError) {
this.onError(data.error);
}
} catch (e) {
console.warn('Failed to parse SSE data:', line);
}
}
}
}
handleConnectionError(error, requestData) {
if (this.currentRetries < this.maxRetries) {
this.currentRetries++;
console.log(`Connection failed, retrying (${this.currentRetries}/${this.maxRetries})...`);
setTimeout(() => {
this.start(requestData);
}, this.retryDelay * this.currentRetries);
} else {
if (this.onError) {
this.onError(`Connection failed after ${this.maxRetries} attempts: ${error.message}`);
}
}
}
handleStreamError(error) {
this.isConnected = false;
if (this.onError) {
this.onError(`Stream error: ${error.message}`);
}
}
disconnect() {
this.isConnected = false;
}
}
Streaming interfaces that run for extended periods can accumulate memory leaks if not properly managed:
Problem: DOM elements accumulating without cleanup
// Problematic - unbounded message history
function addMessage(content) {
const messageDiv = document.createElement('div');
messageDiv.textContent = content;
document.getElementById('messages').appendChild(messageDiv);
// Messages accumulate indefinitely
}
Solution: Implement message history limits and cleanup
class MessageManager {
constructor(containerId, maxMessages = 100) {
this.container = document.getElementById(containerId);
this.maxMessages = maxMessages;
this.messageCount = 0;
}
addMessage(content, className = '') {
const messageDiv = document.createElement('div');
messageDiv.className = className;
messageDiv.textContent = content;
this.container.appendChild(messageDiv);
this.messageCount++;
// Remove old messages if we exceed the limit
if (this.messageCount > this.maxMessages) {
const oldestMessage = this.container.firstChild;
if (oldestMessage) {
this.container.removeChild(oldestMessage);
this.messageCount--;
}
}
this.container.scrollTop = this.container.scrollHeight;
return messageDiv;
}
clear() {
this.container.innerHTML = '';
this.messageCount = 0;
}
}
Track streaming performance to identify bottlenecks:
class StreamingPerformanceMonitor {
constructor() {
this.metrics = {
startTime: null,
firstTokenTime: null,
tokensReceived: 0,
totalCharacters: 0,
averageTokenInterval: 0,
tokenIntervals: []
};
this.lastTokenTime = null;
}
startStream() {
this.metrics.startTime = performance.now();
this.lastTokenTime = this.metrics.startTime;
}
recordToken(token) {
const now = performance.now();
if (this.metrics.firstTokenTime === null) {
this.metrics.firstTokenTime = now;
}
if (this.lastTokenTime !== null) {
const interval = now - this.lastTokenTime;
this.metrics.tokenIntervals.push(interval);
// Keep only last 50 intervals for average calculation
if (this.metrics.tokenIntervals.length > 50) {
this.metrics.tokenIntervals.shift();
}
this.metrics.averageTokenInterval =
this.metrics.tokenIntervals.reduce((a, b) => a + b, 0) /
this.metrics.tokenIntervals.length;
}
this.metrics.tokensReceived++;
this.metrics.totalCharacters += token.length;
this.lastTokenTime = now;
}
getPerformanceReport() {
const endTime = performance.now();
const totalTime = endTime - this.metrics.startTime;
const timeToFirstToken = this.metrics.firstTokenTime - this.metrics.startTime;
return {
totalDuration: Math.round(totalTime),
timeToFirstToken: Math.round(timeToFirstToken),
tokensPerSecond: Math.round((this.metrics.tokensReceived / totalTime) * 1000),
charactersPerSecond: Math.round((this.metrics.totalCharacters / totalTime) * 1000),
averageTokenInterval: Math.round(this.metrics.averageTokenInterval),
totalTokens: this.metrics.tokensReceived,
totalCharacters: this.metrics.totalCharacters
};
}
}
Performance Tip: Monitor your token intervals. If they become irregular or slow, it might indicate network issues, API rate limiting, or backend performance problems.
You've now built comprehensive streaming AI interfaces that handle real-time token delivery, connection management, error recovery, and performance optimization. These patterns form the foundation of responsive AI applications that feel natural and engaging to users.
The techniques you've learned apply directly to:
Your streaming interfaces are now ready for production deployment. Focus next on scaling patterns, monitoring systems, and advanced user experience optimizations that make AI feel truly conversational and intelligent.
Learning Path: Building with LLMs