import asyncio
from typing import List, Dict, Any
import logging
from ..models import FeedImportPayload, FeedItem
from .text_embedding import TextEmbeddingService
logger = logging.getLogger(__name__)
class FeedImportService:
"""Service for importing feed data and computing embeddings."""
def __init__(self, store, embedding_service: TextEmbeddingService):
self.store = store
self.embedding_service = embedding_service
async def import_async(self, request: FeedImportPayload) -> None:
"""Import feed data and compute embeddings."""
account = request.account
items = request.items
logger.info(f"Importing {len(items)} items for {account.get('did', 'unknown')}")
# Store raw data
self.store.import_feed(request)
# Compute embeddings for items with text
texts = []
text_items = []
for item in items:
text = item.get("text", "")
if text:
texts.append(text)
text_items.append(item)
if texts:
# Compute embeddings
embeddings = self.embedding_service.fit_transform(texts)
# Store embeddings
for item, embedding in zip(text_items, embeddings):
self.store.store_embedding(item.get("subjectUri", ""), embedding)
logger.info(f"Imported {len(items)} items with {len(texts)} embeddings")
async def compute_missing_embeddings(self) -> None:
"""Compute embeddings for items that don't have them."""
# Get all items
all_embeddings = self.store.get_all_embeddings()
# This would scan for items without embeddings
# For now, skipped for simplicity
pass