Related topics: Data Factory

Easy Scaling

Starfish provides a mechanism for easy scaling of data generation workflows using the data_factory decorator. This allows any function to be transformed into a scalable data pipeline, enabling parallel processing across multiple inputs. The scaling is achieved through concurrent workers, making it suitable for both experimentation and production environments. README.md

The data_factory decorator simplifies the process of parallelizing data generation tasks. It handles the complexities of concurrency, error handling, and job resumption, allowing developers to focus on the core logic of their data generation functions. This approach supports various LLM providers and dynamic prompts, making it a flexible solution for diverse data generation needs. README.md

Data Factory Decorator

The @data_factory decorator is central to easy scaling in Starfish. It transforms a regular function into a parallel processing pipeline. Sources: src/starfish/data_factory/factory.py:15-20

from starfish import data_factory

@data_factory(max_concurrency=50)
async def parallel_qna_llm(city):
    # Your data generation logic here
    return response.data

Key Features

  • Concurrency Management: The max_concurrency parameter controls the number of concurrent workers. Sources: src/starfish/data_factory/factory.py:43-46
  • Automatic Retries: The system automatically retries failed tasks. Sources: README.md
  • Error Handling: Built-in error handling mechanisms are in place. Sources: README.md
  • Job Resumption: The pipeline can be paused and resumed. Sources: README.md
  • State Management: Allows sharing state across the pipeline. Sources: README.md

Data Factory Workflow

The following diagram illustrates the data factory workflow:

This diagram illustrates how a function is decorated with @data_factory, input data is processed in parallel, and results are obtained after post-processing. Sources: examples/data_factory.ipynb

Scaling Any Workflow

The data_factory decorator can be applied to any function, regardless of its complexity. This includes simple functions and complex workflows involving pre-processing, multiple LLM calls, post-processing, and error handling. Sources: examples/data_factory.ipynb

Example

from starfish import data_factory

@data_factory(max_concurrency=50)
async def parallel_qna_llm(city):
    # Complex workflow
    response = await qna_llm.run(city=city)
    return response.data

cities = ["San Francisco", "New York", "Tokyo", "Paris", "London"] * 20
results = parallel_qna_llm.run(city=cities)

This code snippet demonstrates how to use the data_factory decorator to scale a question-answering workflow across multiple cities. Sources: examples/data_factory.ipynb

Workflow Steps

  1. Define a Function: Create a function that encapsulates the data generation logic. Sources: examples/data_factory.ipynb
  2. Apply the Decorator: Decorate the function with @data_factory, specifying the desired concurrency level. Sources: src/starfish/data_factory/factory.py:15-20
  3. Run the Pipeline: Call the decorated function with a list of inputs. Sources: examples/data_factory.ipynb

Resuming Jobs

The data_factory provides functionality to resume jobs from where they left off. This is useful for long-running data generation tasks that may be interrupted. Sources: README.md

Example

results = parallel_qna_llm.resume()

This code snippet shows how to resume a data generation job using the resume() method. Sources: examples/data_factory.ipynb

State Management and Hooks

data_factory allows users to define hooks that modify the state of the workflow during runtime. These hooks can be used to implement custom logic for error handling, data validation, or other tasks. Sources: tests/data_factory/factory/test_run.py

from starfish import data_factory, STATUS_COMPLETED

def test_hook(data, state):
    state.update({"variable": f"changed_state - {data}"})
    return STATUS_COMPLETED

@data_factory(max_concurrency=2, on_record_complete=[test_hook], initial_state_values={"variable": "initial_state"})
async def test1(city_name, num_records_per_city, fail_rate=0.1, sleep_time=0.05):
    return await mock_llm_call(city_name, num_records_per_city, fail_rate=fail_rate, sleep_time=sleep_time)

In this example, test_hook modifies the state of the workflow by updating the variable key with a new value. Sources: tests/data_factory/factory/test_run.py

Data Factory with Hooks Workflow

This sequence diagram shows the interaction between the user, data factory, task, and hook during the execution of a data generation workflow. Sources: tests/data_factory/factory/test_run.py

Data Storage

The LocalStorage class provides methods for saving and retrieving data artifacts, such as request configurations and record data. This class is essential for managing the data generated during the scaling process. Sources: src/starfish/data_factory/storage/local/local_storage.py

Data Persistence

The LocalStorage class implements methods for saving and retrieving data artifacts.

async def save_request_config(self, config_ref: str, config_data: Dict[str, Any]) -> str:
    return await self._data_handler.save_request_config_impl(config_ref, config_data)

async def get_record_data(self, output_ref: str) -> Dict[str, Any]:
    return await self._data_handler.get_record_data_impl(output_ref)

These methods allow the storage and retrieval of request configurations and record data, which are crucial for managing the data generation process. Sources: src/starfish/data_factory/storage/local/local_storage.py

Configuration

Starfish uses environment variables for configuration, including API keys and model configurations. A .env.template file is provided to help users get started. Sources: README.md

Conclusion

Easy scaling in Starfish, achieved through the data_factory decorator, simplifies the creation and execution of parallel data generation pipelines. It provides features like concurrency management, automatic retries, error handling, and job resumption, making it a powerful tool for both experimentation and production environments.


Storage Layer