Skip to content

A functional pipeline library for Python with support for synchronous and asynchronous operations

License

Notifications You must be signed in to change notification settings

kravlad/pyperly

Repository files navigation

pyperly

PyPI version License: MIT Build Status Python versions

A functional pipeline library for Python with support for synchronous and asynchronous operations.

pyperly provides a clear and expressive way to build data processing pipelines. It allows you to chain a series of functions (transformations) in a fluent, readable manner, inspired by functional programming concepts. Whether you're working with synchronous code or complex async workflows, pyperly helps you write cleaner, more maintainable data-centric logic.

Key Features

  • Fluent Interface: Chain operations together in a natural and readable way.
  • Sync & Async Support: Seamlessly mix and match synchronous and asynchronous functions in your pipelines.
  • Error Handling: Gracefully handle exceptions using the Result object, preventing crashes.
  • Side Effects: Perform actions like logging or database writes without interrupting the main data flow using apply.
  • Conditional Logic: Use ensure to validate data at any stage of the pipeline and branch logic accordingly.

Installation

System Requirements:

  • Python 3.10+

To install the library, run the following command:

pip install pyperly

Getting Started

Here's a simple example to get you started. Let's create a pipeline that takes a number, adds 10, and then doubles the result.

from pyperly import let

# Define the processing functions
def add_ten(n):
    return n + 10

def double(n):
    return n * 2

# Create and run the pipeline
initial_value = 5
result = let(initial_value).bind(add_ten).bind(double).run()

print(f"The result is: {result}")
# Output: The result is: 30

Usage Examples

This section provides more detailed examples demonstrating various features of pyperly.

1. Basic Synchronous Pipeline

This example shows a simple chain of synchronous functions. The pipeline starts with a string, processes it, and returns the length.

from pyperly import let

def to_uppercase(text: str) -> str:
    print(f"Uppercasing: '{text}'")
    return text.upper()

def add_exclamation(text: str) -> str:
    print(f"Adding exclamation to: '{text}'")
    return f"{text}!"

def get_length(text: str) -> int:
    return len(text)

# Start with an initial value and chain the functions
pipeline = let("hello world").bind(to_uppercase).bind(add_exclamation).bind(get_length)

# The .run() method executes all steps in order
result = pipeline.run()

print(f"\nFinal length: {result}")
# Output:
# Uppercasing: 'hello world'
# Adding exclamation to: 'HELLO WORLD'
#
# Final length: 12

2. Asynchronous Pipeline

pyperly handles async functions just as easily. Here's a pipeline that simulates fetching data from a web API and then processing it.

import asyncio
from pyperly import alet

# Simulate an async API call
async def fetch_user_data(user_id: int) -> dict:
    print(f"Fetching data for user {user_id}...")
    await asyncio.sleep(0.1)  # Simulate network latency
    return {"id": user_id, "name": "John Doe", "email": "john.doe@example.com"}

# An async function to extract a specific field
async def get_field(data: dict, field: str) -> str:
    print(f"Extracting field '{field}'...")
    await asyncio.sleep(0.1)
    return data.get(field, "N/A")

async def main():
    # Use alet() to start an async pipeline
    # Note: the first function is passed directly to alet()
    user_email = await alet(fetch_user_data, 101).bind(get_field, field="email").arun()

    print(f"\nUser email: {user_email}")

asyncio.run(main())
# Output:
# Fetching data for user 101...
# Extracting field 'email'...
#
# User email: john.doe@example.com

3. Handling Side Effects with apply

Sometimes you need to perform an action that doesn't change the data, like logging. Use the apply method for this. The value passed to the next step remains unchanged.

from pyperly import let

def log_value(value):
    # This function is for a side effect (printing)
    print(f"[LOG] Current value: {value}")
    # It doesn't need to return anything meaningful

def multiply_by_three(n):
    return n * 3

result = (
    let(10)
    .apply(log_value)
    .bind(multiply_by_three)
    .apply(log_value)
    .run()
)

print(f"\nFinal result: {result}")
# Output:
# [LOG] Current value: 10
# [LOG] Current value: 30
#
# Final result: 30

4. Validation with ensure

You can validate the data at any point in the pipeline using ensure. If the condition fails, the pipeline stops and returns None, or a default value if one is provided.

from pyperly import let

def is_positive(n):
    return n > 0

# Scenario 1: Validation passes
pipeline_success = (
    let(20)
    .ensure(is_positive)
    .bind(lambda x: x - 5)
)
result_success = pipeline_success.run()
print(f"Success case result: {result_success}") # Output: 15

# Scenario 2: Validation fails, pipeline stops and returns None
pipeline_fail = (
    let(-10)
    .ensure(is_positive)
    .bind(lambda x: x - 5) # This step is never reached
)
result_fail = pipeline_fail.run()
print(f"Failure case result: {result_fail}") # Output: None

# Scenario 3: Validation fails, but a default value is provided
pipeline_default = (
    let(-10)
    .ensure(is_positive, default=0) # If ensure fails, the pipeline continues with 0
    .bind(lambda x: x + 100)
)
result_default = pipeline_default.run()
print(f"Default case result: {result_default}") # Output: 100

5. Using the & Operator for Concise Pipelines

For a more compact and expressive style, you can use the & operator as an alternative to bind(). It works with lambda functions as well as bind, apply, and ensure callbacks, allowing you to build sophisticated pipelines with minimal boilerplate.

from pyperly import let, bind, apply, ensure

# A pipeline to process a list of numbers
# 1. Start with a list.
# 2. Ensure the list is not empty, otherwise default to [0].
# 3. Log the initial list (side effect).
# 4. Sum the numbers in the list.
# 5. Add 100 to the sum.
pipeline = (
    let([1, 2, 3])
    & ensure(lambda x: len(x) > 0, default=[0])
    & apply(lambda x: print(f"Processing list: {x}"))
    & bind(sum)
    & (lambda total: total + 100)
)

result = pipeline.run()
print(f"Final result: {result}")
# Output:
# Processing list: [1, 2, 3]
# Final result: 106

# Example with a failing validation
empty_list_result = (let([]) & ensure(lambda x: len(x) > 0, default=[0]) & bind(sum)).run()
print(f"\nResult with empty list: {empty_list_result}")
# Output:
# Result with empty list: 0

6. Error Handling with result

To safely execute a pipeline that might raise an exception, use the .result() or .aresult() method. It returns a Result object which is either ok or contains an error.

from pyperly import let

def divide(a, b):
    if b == 0:
        raise ValueError("Cannot divide by zero")
    return a / b

# Successful execution
result_ok = let(10).bind(divide, 2).result()
if result_ok.ok:
    print(f"Success: {result_ok.value}")
else:
    print(f"Error: {result_ok.error}")
# Output: Success: 5.0

# Execution with an error
result_err = let(10).bind(divide, 0).result()
if result_err.ok:
    print(f"Success: {result_err.value}")
else:
    print(f"Error: {result_err.error}")
# Output: Error: Cannot divide by zero

API Overview

The core of the library revolves around the Pipeline object and a few key functions. Most functions that accept a callable (like bind, apply, and ensure) also accept a common set of keyword arguments to control their behavior.

Core Functions

  • let(value) or let(fn, *args, **kwargs): Creates a new synchronous pipeline.
  • alet(coro, *args, **kwargs): Creates a new asynchronous pipeline.
  • pipeline.bind(fn, *args, **kwargs): Chains a transformation. The return value of fn becomes the new value in the pipeline.
  • pipeline.abind(coro, *args, **kwargs): Chains an asynchronous transformation.
  • pipeline.apply(fn, *args, **kwargs): Executes a function for side effects. The pipeline's value is not modified.
  • pipeline.aapply(coro, *args, **kwargs): Executes an async function for side effects.
  • pipeline.ensure(predicate, **kwargs): Validates the pipeline's current value. If the predicate is false, the pipeline stops (returning None) or continues with the default value if provided.
  • pipeline.aensure(coro, **kwargs): Asynchronous validation.
  • pipeline.run(is_async: bool = False, allow_none: bool = False): Executes a synchronous pipeline and returns the final value.
  • pipeline.arun(allow_none: bool = False): Executes an asynchronous pipeline and returns the final value.
  • pipeline.result(is_async: bool = False, allow_none: bool = False): Executes the pipeline and returns a Result object, capturing any exceptions.
  • pipeline.aresult(allow_none: bool = False): Executes an async pipeline and returns a Result object.

Execution Parameters

The run, arun, result, and aresult methods accept the following parameters to control pipeline execution:

  • is_async: bool: When set to True in run() or result(), it forces the pipeline to execute asynchronously, even if it only contains synchronous steps. This is useful for consistent execution in mixed environments.
  • allow_none: bool: Sets the default behavior for handling None for the entire pipeline execution. If a step does not have its own allow_none setting, this value is used. However, a step-specific allow_none parameter will always override this global setting for that particular step. Defaults to False.

Common Parameters

These parameters can be used with bind, abind, apply, aapply, ensure, and aensure to customize their behavior:

  • result_kw: str: Instead of passing the pipeline's current value as the first positional argument to the function, this parameter allows you to pass it as a keyword argument. This is useful for functions where you can't or don't want to change the signature.

    def process_data(data, config):
        return data + config
    
    # The pipeline value (10) will be passed as the 'data' argument.
    let(10).bind(process_data, result_kw="data", config=5).run() # Result: 15
  • is_async: bool: Explicitly tells the pipeline to treat a function as asynchronous, even if it's not a coroutine function defined with async def. This is an advanced use case, typically for functions that return an awaitable. Defaults to False.

  • allow_none: bool: Controls how None values are handled.

    • If False (the default), the pipeline will stop execution if a step returns None.
    • If True, None is treated as a valid value and is passed to the next step in the pipeline.
  • default: Any: Provides a fallback value.

    • In bind: If the function's result is None, the pipeline will continue with this default value instead of stopping.
    • In ensure: If the predicate returns False, the pipeline will continue with this default value.

Contributing

Contributions are welcome! If you have a suggestion or find a bug, please open an issue or submit a pull request.

  1. Fork the repository.
  2. Create a new branch (git checkout -b feature/my-new-feature).
  3. Make your changes.
  4. Commit your changes (git commit -am 'Add some feature').
  5. Push to the branch (git push origin feature/my-new-feature).
  6. Create a new Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

About

A functional pipeline library for Python with support for synchronous and asynchronous operations

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published