import os
import re
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
import base64
import json
import logging
logger = logging.getLogger(__name__)
class FeedRequestIdentityResolver:
"""Resolve viewer identity from feed requests."""
def __init__(self):
self.service_auth_token: Optional[str] = None
def resolve_viewer_did(self, request_headers: Dict[str, str],
actor: Optional[str] = None) -> Optional[str]:
"""Resolve the viewer DID from request."""
# In production, this would validate a service auth token
# For now, we trust the actor parameter
if actor:
return actor
# Check for Authorization header with Bearer token
auth_header = request_headers.get("authorization", "")
if auth_header.startswith("Bearer "):
# In production, validate the JWT
# For now, extract did from token payload if present
try:
token = auth_header[7:]
# Decode JWT payload (no verification for now)
parts = token.split(".")
if len(parts) >= 2:
payload = json.loads(base64.urlsafe_b64decode(parts[1] + "=="))
return payload.get("sub") or payload.get("aud")
except Exception as e:
logger.warning(f"Failed to parse auth token: {e}")
return None
class OAuthOriginResolver:
"""Resolve OAuth public origin from configuration."""
@staticmethod
def resolve(http_context: Dict[str, Any], config: Dict[str, Any]) -> Optional[str]:
"""Resolve the public origin for OAuth."""
# First check environment/config
origin = os.environ.get("BLUESKY_OAUTH_PUBLIC_ORIGIN", "")
if origin:
return origin
# Then check request
scheme = http_context.get("scheme", "https")
host = http_context.get("host", "")
if host:
return f"{scheme}://{host}"
return None