Data Flow & Processing Pipeline

Complete Data Flow Architecture

This document outlines the comprehensive data flow through the Prompted Forge platform, from document ingestion to query response generation.

Document Ingestion Pipeline

flowchart TB
    subgraph "Input Sources"
        FILE[Document Upload]
        URL[Web Scraping]
        TEXT[Direct Text]
        AUDIO[Audio Files]
        VIDEO[Video Files]
        HOTDIR[Hot Directory Monitor]
    end

    subgraph "Collector Service"
        DETECT[File Type Detection]
        PARSE[Content Parser]
        OCR[OCR Processing]
        WHISPER[Audio Transcription]
        CLEAN[Content Cleaning]
        CHUNK[Text Chunking]
        EMBED[Embedding Generation]
    end

    subgraph "Storage Layer"
        BLOB[(Blob Storage)]
        RELDB[(Relational DB)]
        VECTOR[(Vector DB)]
        CACHE[(Cache Layer)]
    end

    subgraph "Validation & Processing"
        VALIDATE[Content Validation]
        DEDUP[Deduplication]
        METADATA[Metadata Extraction]
        INDEX[Vector Indexing]
    end

    FILE --> DETECT
    URL --> DETECT
    TEXT --> CLEAN
    AUDIO --> WHISPER
    VIDEO --> WHISPER
    HOTDIR --> DETECT

    DETECT --> PARSE
    PARSE --> OCR
    PARSE --> CLEAN
    WHISPER --> CLEAN

    CLEAN --> CHUNK
    CHUNK --> VALIDATE
    VALIDATE --> DEDUP
    DEDUP --> METADATA

    METADATA --> EMBED
    EMBED --> VECTOR
    METADATA --> RELDB
    PARSE --> BLOB

    VECTOR --> INDEX
    INDEX --> CACHE

Query Processing Pipeline

flowchart LR
    subgraph "User Interface"
        CHAT[Chat Interface]
        API[API Client]
        WS[WebSocket Connection]
    end

    subgraph "Query Processing"
        PARSE_Q[Query Parsing]
        EMBED_Q[Query Embedding]
        FILTER[Context Filtering]
        SEARCH[Vector Search]
        RANK[Result Ranking]
        CONTEXT[Context Assembly]
    end

    subgraph "Storage Access"
        VECTOR_Q[(Vector Search)]
        RELDB_Q[(Metadata Lookup)]
        CACHE_Q[(Cache Check)]
    end

    subgraph "Response Generation"
        LLM[LLM Processing]
        STREAM[Response Streaming]
        PERSIST[Response Persistence]
    end

    subgraph "External Services"
        OPENAI[OpenAI/GPT]
        CLAUDE[Anthropic Claude]
        LOCAL[Local LLM]
        EMBED_SERVICE[Embedding Service]
    end

    CHAT --> PARSE_Q
    API --> PARSE_Q
    WS --> PARSE_Q

    PARSE_Q --> CACHE_Q
    CACHE_Q --> EMBED_Q
    EMBED_Q --> EMBED_SERVICE
    EMBED_SERVICE --> FILTER

    FILTER --> SEARCH
    SEARCH --> VECTOR_Q
    VECTOR_Q --> RANK
    RANK --> RELDB_Q
    RELDB_Q --> CONTEXT

    CONTEXT --> LLM
    LLM --> OPENAI
    LLM --> CLAUDE
    LLM --> LOCAL

    OPENAI --> STREAM
    CLAUDE --> STREAM
    LOCAL --> STREAM

    STREAM --> WS
    STREAM --> PERSIST

Astra DB Data Movement

sequenceDiagram
    participant App as Application
    participant Bundle as Secure Bundle
    participant Astra as Astra DB
    participant Vector as Vector Store
    participant Cache as Cache Layer

    Note over App,Astra: Initialization Phase
    App->>Bundle: Download secure bundle
    Bundle-->>App: Connection credentials
    App->>Astra: Establish connection
    Astra-->>App: Connection confirmed

    Note over App,Vector: Document Ingestion
    App->>Vector: Store document embeddings
    Vector->>Astra: Write vector data
    Astra-->>Vector: Write confirmed
    Vector-->>App: Storage complete

    Note over App,Cache: Query Processing
    App->>Cache: Check cached results
    alt Cache Miss
        Cache-->>App: Not found
        App->>Vector: Similarity search
        Vector->>Astra: Vector query
        Astra-->>Vector: Results
        Vector-->>App: Ranked results
        App->>Cache: Store results
    else Cache Hit
        Cache-->>App: Cached results
    end

    Note over App,Astra: Background Operations
    loop Periodic Maintenance
        App->>Astra: Health check
        Astra-->>App: Status OK
        App->>Vector: Index optimization
        Vector-->>App: Optimization complete
    end

Read cache - Cache the data versus the queries - (up to 2.3 TB)

File Processing State Machine

stateDiagram-v2
    [*] --> Received
    Received --> TypeDetection : File uploaded
    TypeDetection --> ParsingText : Text document
    TypeDetection --> ParsingPDF : PDF document
    TypeDetection --> ParsingAudio : Audio file
    TypeDetection --> ParsingVideo : Video file
    TypeDetection --> Error : Unsupported type

    ParsingText --> ContentCleaning
    ParsingPDF --> OCRProcessing : Contains images
    ParsingPDF --> ContentCleaning : Text extractable
    ParsingAudio --> Transcription
    ParsingVideo --> Transcription

    OCRProcessing --> ContentCleaning
    Transcription --> ContentCleaning

    ContentCleaning --> TextChunking
    TextChunking --> EmbeddingGeneration
    EmbeddingGeneration --> VectorStorage
    VectorStorage --> MetadataStorage
    MetadataStorage --> Complete

    Complete --> [*]
    Error --> [*]

    state "Processing States" as processing {
        TypeDetection : File type detection
        ParsingText : Extract text content
        ParsingPDF : Parse PDF structure
        ParsingAudio : Audio preprocessing
        ParsingVideo : Extract audio track
        OCRProcessing : OCR text extraction
        Transcription : Speech-to-text
        ContentCleaning : Clean & normalize
        TextChunking : Split into chunks
        EmbeddingGeneration : Generate vectors
        VectorStorage : Store in vector DB
        MetadataStorage : Store metadata
    }

Real-time Data Synchronization

flowchart TB
    subgraph "Hot Directory Monitoring"
        WATCHER[File System Watcher]
        EVENTS[File Events]
        QUEUE[Processing Queue]
    end

    subgraph "Real-time Processing"
        DETECT[Change Detection]
        DIFF[Content Diff]
        UPDATE[Incremental Update]
        NOTIFY[Client Notification]
    end

    subgraph "WebSocket Streams"
        WS_SERVER[WebSocket Server]
        WS_CLIENT[Client Connections]
        BROADCAST[Broadcast Updates]
    end

    subgraph "Data Consistency"
        LOCK[Resource Locking]
        VALIDATE[Consistency Check]
        ROLLBACK[Rollback Handler]
    end

    WATCHER --> EVENTS
    EVENTS --> QUEUE
    QUEUE --> DETECT

    DETECT --> DIFF
    DIFF --> LOCK
    LOCK --> UPDATE
    UPDATE --> VALIDATE

    VALIDATE --> NOTIFY
    VALIDATE --> ROLLBACK
    NOTIFY --> WS_SERVER
    WS_SERVER --> BROADCAST
    BROADCAST --> WS_CLIENT

Hybrid Query Architecture

The platform supports hybrid queries combining vector similarity with traditional filtering:

flowchart LR
    subgraph "Query Input"
        USER_Q[User Query]
        FILTERS[Filter Criteria]
        CONTEXT[Chat Context]
    end

    subgraph "Query Decomposition"
        SEMANTIC[Semantic Component]
        STRUCTURAL[Structural Component]
        TEMPORAL[Temporal Component]
    end

    subgraph "Parallel Processing"
        VECTOR_SEARCH[Vector Similarity]
        METADATA_FILTER[Metadata Filtering]
        CONTEXT_FILTER[Context Filtering]
    end

    subgraph "Result Fusion"
        SCORE_FUSION[Score Fusion]
        RANK_FUSION[Rank Fusion]
        DEDUPE[Deduplication]
    end

    subgraph "Response Assembly"
        CONTEXT_ASSEMBLY[Context Assembly]
        PROMPT_BUILDING[Prompt Building]
        LLM_CALL[LLM Generation]
    end

    USER_Q --> SEMANTIC
    FILTERS --> STRUCTURAL
    CONTEXT --> TEMPORAL

    SEMANTIC --> VECTOR_SEARCH
    STRUCTURAL --> METADATA_FILTER
    TEMPORAL --> CONTEXT_FILTER

    VECTOR_SEARCH --> SCORE_FUSION
    METADATA_FILTER --> RANK_FUSION
    CONTEXT_FILTER --> DEDUPE

    SCORE_FUSION --> CONTEXT_ASSEMBLY
    RANK_FUSION --> PROMPT_BUILDING
    DEDUPE --> LLM_CALL

Data Persistence Strategy

Write Path Optimization