Related topics: Architecture Overview, Easy Scaling

Data Factory

The Data Factory in Starfish is a Python library component designed to transform any function into a scalable data pipeline. It allows for parallel processing of inputs, automatic retries, error handling, and job resumption. Data Factory provides mechanisms for managing state, extending functionality with custom hooks, and persisting metadata and data artifacts. It supports structured LLM outputs and flexibility in model selection. README.md

Overview

Data Factory enables users to define how their data should look and scale seamlessly from experiments to production. It offers features such as structured outputs using JSON schemas or Pydantic models, model flexibility supporting various LLM providers, dynamic prompts with Jinja2 templates, and easy scaling with a single decorator. The system provides a resilient pipeline with automatic retries, error handling, and job resumption, allowing users to pause and continue data generation at any time. It also offers complete control through shared state across the pipeline and extensible functionality with custom hooks. README.md

Key Features

  • Scalable Data Pipelines: Transforms any function into a scalable data pipeline with a single decorator. README.md
  • Parallel Processing: Executes functions in parallel across thousands of inputs. README.md
  • Resilient Pipeline: Provides automatic retries, error handling, and job resumption. README.md
  • State Management: Allows sharing state across the pipeline. README.md
  • Custom Hooks: Extends functionality with custom hooks. README.md
  • Structured Outputs: Supports structured data through JSON schemas or Pydantic models. README.md
  • Model Flexibility: Works with any LLM provider. README.md
  • Dynamic Prompts: Uses Jinja2 templates for dynamic prompts. README.md

Architecture

The Data Factory architecture involves several key components that work together to enable scalable and resilient data generation. These components include the data_factory decorator, task runners, state management, storage interfaces, and metadata models.

The diagram above illustrates the high-level architecture of the Data Factory. A user-defined function is decorated with @data_factory, which configures and manages the execution of the function. The decorated function is then executed by a Task Runner, which handles parallel processing, retries, and error handling. The Task Runner interacts with an LLM or other data generation process, and the resulting data artifacts are stored using a Storage Interface. State Management allows for sharing state across the pipeline, and Hooks enable extending functionality with custom logic. Metadata about the data generation process is also stored via the Storage Interface. Sources: src/starfish/data_factory/factory.py, src/starfish/data_factory/task_runner.py, src/starfish/data_factory/storage/models.py

Data Factory Decorator

The @data_factory decorator is central to the Data Factory. It transforms a regular function into a scalable data pipeline. src/starfish/data_factory/factory.py The decorator accepts several parameters to configure the data generation process, including:

@data_factory(max_concurrency=50)
async def parallel_qna_llm(city):
    # ... data generation logic ...
    response = await qna_llm.run(city=city)
    return response.data

Sources: src/starfish/data_factory/factory.py, README.md

Task Runner

The Task Runner is responsible for executing the data generation tasks. It handles parallel processing, retries, and error handling. src/starfish/data_factory/task_runner.py The Task Runner uses an asyncio.Semaphore to limit the number of concurrent tasks. src/starfish/data_factory/task_runner.py It also manages the state of each task and persists metadata about the task execution. src/starfish/data_factory/task_runner.py

State Management

State Management allows for sharing state across the data generation pipeline. The State class provides methods for setting, getting, and updating state variables. src/starfish/data_factory/utils/state.py The state is persisted using the storage interface, allowing for job resumption and error recovery. src/starfish/data_factory/utils/state.py

from starfish.data_factory.utils.state import State

state = State(initial_values={"completed_count": 0})
state.set("completed_count", 1)
state.update({"completed_count": 2})

Sources: src/starfish/data_factory/utils/state.py, src/starfish/data_template/templates/starfish/get_city_info_wf.py

Storage Interface

The Storage Interface provides a pluggable interface for persisting metadata and data artifacts. src/starfish/data_factory/storage/local/local_storage.py It supports different storage backends, including a local implementation using SQLite for metadata and JSON files for data. src/starfish/data_factory/storage/local/local_storage.py The Storage Interface defines methods for saving and retrieving projects, jobs, and records. src/starfish/data_factory/storage/local/local_storage.py

The LocalStorage class implements the Storage Interface using SQLite for metadata and JSON files for data artifacts. src/starfish/data_factory/storage/local/local_storage.py It provides methods for saving and retrieving projects, jobs, and records. src/starfish/data_factory/storage/local/local_storage.py

Metadata Models

The models.py file defines the data models used to persist metadata about the data generation process. src/starfish/data_factory/storage/models.py These models include:

from starfish.data_factory.storage.models import Project, GenerationMasterJob, GenerationJob, Record

project = Project(project_id=str(uuid.uuid4()), name="Workflow Test Project")

Sources: src/starfish/data_factory/storage/models.py, tests/data_factory/storage/test_storage_main.py

Workflow Execution

The Data Factory workflow involves several steps, from project creation to record retrieval.

  1. Project Creation: A project is created to group related data generation jobs. tests/data_factory/storage/test_local_storage.py
  2. Master Job Creation: A master job is created to define the overall data generation task. tests/data_factory/storage/test_local_storage.py
  3. Execution Job Creation: Execution jobs are created to execute the data generation task in parallel. tests/data_factory/storage/test_local_storage.py
  4. Record Generation: Records are generated by the data generation function. tests/data_factory/storage/test_local_storage.py
  5. Record Storage: Records are stored using the storage interface. tests/data_factory/storage/test_local_storage.py
  6. Job Completion: Execution jobs and master jobs are marked as completed. tests/data_factory/storage/test_local_storage.py
  7. Data Retrieval: Data is retrieved from the storage interface. tests/data_factory/storage/test_local_storage.py

The sequence diagram above illustrates the workflow execution in Data Factory. The user defines a data generation function and configures Data Factory with the @data_factory decorator. Data Factory submits tasks to the Task Runner, which executes the data generation function, saves record data to the Storage, logs record metadata, and returns the results to the user. Sources: src/starfish/data_factory/factory.py, src/starfish/data_factory/task_runner.py, src/starfish/data_factory/storage/local/local_storage.py

Example Usage

from starfish import data_factory

@data_factory(max_concurrency=50)
async def parallel_qna_llm(city):
    # This could be any arbitrary complex workflow:
    # - Pre-processing
    # - Multiple LLM calls
    # - Post-processing
    # - Error handling
    response = await qna_llm.run(city=city)
    return response.data

cities = ["San Francisco", "New York", "Tokyo", "Paris", "London"] * 20
results = parallel_qna_llm.run(city=cities)

This example demonstrates how to use the @data_factory decorator to transform a function into a scalable data pipeline. The parallel_qna_llm function is decorated with @data_factory, which configures it to run with a maximum concurrency of 50. The run method is then called with a list of cities, which will be processed in parallel. README.md

Conclusion

Data Factory provides a flexible and scalable solution for data generation. It enables users to define their data generation logic and scale it seamlessly from experiments to production. With features such as parallel processing, automatic retries, error handling, and state management, Data Factory simplifies the process of generating large datasets.