A functional pipeline library for Python with support for synchronous and asynchronous operations.
pyperly provides a clear and expressive way to build data processing pipelines. It allows you to chain a series of functions (transformations) in a fluent, readable manner, inspired by functional programming concepts. Whether you're working with synchronous code or complex async workflows, pyperly helps you write cleaner, more maintainable data-centric logic.
- Fluent Interface: Chain operations together in a natural and readable way.
- Sync & Async Support: Seamlessly mix and match synchronous and asynchronous functions in your pipelines.
- Error Handling: Gracefully handle exceptions using the
Resultobject, preventing crashes. - Side Effects: Perform actions like logging or database writes without interrupting the main data flow using
apply. - Conditional Logic: Use
ensureto validate data at any stage of the pipeline and branch logic accordingly.
System Requirements:
- Python 3.10+
To install the library, run the following command:
pip install pyperlyHere's a simple example to get you started. Let's create a pipeline that takes a number, adds 10, and then doubles the result.
from pyperly import let
# Define the processing functions
def add_ten(n):
return n + 10
def double(n):
return n * 2
# Create and run the pipeline
initial_value = 5
result = let(initial_value).bind(add_ten).bind(double).run()
print(f"The result is: {result}")
# Output: The result is: 30This section provides more detailed examples demonstrating various features of pyperly.
This example shows a simple chain of synchronous functions. The pipeline starts with a string, processes it, and returns the length.
from pyperly import let
def to_uppercase(text: str) -> str:
print(f"Uppercasing: '{text}'")
return text.upper()
def add_exclamation(text: str) -> str:
print(f"Adding exclamation to: '{text}'")
return f"{text}!"
def get_length(text: str) -> int:
return len(text)
# Start with an initial value and chain the functions
pipeline = let("hello world").bind(to_uppercase).bind(add_exclamation).bind(get_length)
# The .run() method executes all steps in order
result = pipeline.run()
print(f"\nFinal length: {result}")
# Output:
# Uppercasing: 'hello world'
# Adding exclamation to: 'HELLO WORLD'
#
# Final length: 12pyperly handles async functions just as easily. Here's a pipeline that simulates fetching data from a web API and then processing it.
import asyncio
from pyperly import alet
# Simulate an async API call
async def fetch_user_data(user_id: int) -> dict:
print(f"Fetching data for user {user_id}...")
await asyncio.sleep(0.1) # Simulate network latency
return {"id": user_id, "name": "John Doe", "email": "john.doe@example.com"}
# An async function to extract a specific field
async def get_field(data: dict, field: str) -> str:
print(f"Extracting field '{field}'...")
await asyncio.sleep(0.1)
return data.get(field, "N/A")
async def main():
# Use alet() to start an async pipeline
# Note: the first function is passed directly to alet()
user_email = await alet(fetch_user_data, 101).bind(get_field, field="email").arun()
print(f"\nUser email: {user_email}")
asyncio.run(main())
# Output:
# Fetching data for user 101...
# Extracting field 'email'...
#
# User email: john.doe@example.comSometimes you need to perform an action that doesn't change the data, like logging. Use the apply method for this. The value passed to the next step remains unchanged.
from pyperly import let
def log_value(value):
# This function is for a side effect (printing)
print(f"[LOG] Current value: {value}")
# It doesn't need to return anything meaningful
def multiply_by_three(n):
return n * 3
result = (
let(10)
.apply(log_value)
.bind(multiply_by_three)
.apply(log_value)
.run()
)
print(f"\nFinal result: {result}")
# Output:
# [LOG] Current value: 10
# [LOG] Current value: 30
#
# Final result: 30You can validate the data at any point in the pipeline using ensure. If the condition fails, the pipeline stops and returns None, or a default value if one is provided.
from pyperly import let
def is_positive(n):
return n > 0
# Scenario 1: Validation passes
pipeline_success = (
let(20)
.ensure(is_positive)
.bind(lambda x: x - 5)
)
result_success = pipeline_success.run()
print(f"Success case result: {result_success}") # Output: 15
# Scenario 2: Validation fails, pipeline stops and returns None
pipeline_fail = (
let(-10)
.ensure(is_positive)
.bind(lambda x: x - 5) # This step is never reached
)
result_fail = pipeline_fail.run()
print(f"Failure case result: {result_fail}") # Output: None
# Scenario 3: Validation fails, but a default value is provided
pipeline_default = (
let(-10)
.ensure(is_positive, default=0) # If ensure fails, the pipeline continues with 0
.bind(lambda x: x + 100)
)
result_default = pipeline_default.run()
print(f"Default case result: {result_default}") # Output: 100For a more compact and expressive style, you can use the & operator as an alternative to bind(). It works with lambda functions as well as bind, apply, and ensure callbacks, allowing you to build sophisticated pipelines with minimal boilerplate.
from pyperly import let, bind, apply, ensure
# A pipeline to process a list of numbers
# 1. Start with a list.
# 2. Ensure the list is not empty, otherwise default to [0].
# 3. Log the initial list (side effect).
# 4. Sum the numbers in the list.
# 5. Add 100 to the sum.
pipeline = (
let([1, 2, 3])
& ensure(lambda x: len(x) > 0, default=[0])
& apply(lambda x: print(f"Processing list: {x}"))
& bind(sum)
& (lambda total: total + 100)
)
result = pipeline.run()
print(f"Final result: {result}")
# Output:
# Processing list: [1, 2, 3]
# Final result: 106
# Example with a failing validation
empty_list_result = (let([]) & ensure(lambda x: len(x) > 0, default=[0]) & bind(sum)).run()
print(f"\nResult with empty list: {empty_list_result}")
# Output:
# Result with empty list: 0To safely execute a pipeline that might raise an exception, use the .result() or .aresult() method. It returns a Result object which is either ok or contains an error.
from pyperly import let
def divide(a, b):
if b == 0:
raise ValueError("Cannot divide by zero")
return a / b
# Successful execution
result_ok = let(10).bind(divide, 2).result()
if result_ok.ok:
print(f"Success: {result_ok.value}")
else:
print(f"Error: {result_ok.error}")
# Output: Success: 5.0
# Execution with an error
result_err = let(10).bind(divide, 0).result()
if result_err.ok:
print(f"Success: {result_err.value}")
else:
print(f"Error: {result_err.error}")
# Output: Error: Cannot divide by zeroThe core of the library revolves around the Pipeline object and a few key functions. Most functions that accept a callable (like bind, apply, and ensure) also accept a common set of keyword arguments to control their behavior.
let(value)orlet(fn, *args, **kwargs): Creates a new synchronous pipeline.alet(coro, *args, **kwargs): Creates a new asynchronous pipeline.pipeline.bind(fn, *args, **kwargs): Chains a transformation. The return value offnbecomes the new value in the pipeline.pipeline.abind(coro, *args, **kwargs): Chains an asynchronous transformation.pipeline.apply(fn, *args, **kwargs): Executes a function for side effects. The pipeline's value is not modified.pipeline.aapply(coro, *args, **kwargs): Executes an async function for side effects.pipeline.ensure(predicate, **kwargs): Validates the pipeline's current value. If the predicate is false, the pipeline stops (returningNone) or continues with thedefaultvalue if provided.pipeline.aensure(coro, **kwargs): Asynchronous validation.pipeline.run(is_async: bool = False, allow_none: bool = False): Executes a synchronous pipeline and returns the final value.pipeline.arun(allow_none: bool = False): Executes an asynchronous pipeline and returns the final value.pipeline.result(is_async: bool = False, allow_none: bool = False): Executes the pipeline and returns aResultobject, capturing any exceptions.pipeline.aresult(allow_none: bool = False): Executes an async pipeline and returns aResultobject.
The run, arun, result, and aresult methods accept the following parameters to control pipeline execution:
is_async: bool: When set toTrueinrun()orresult(), it forces the pipeline to execute asynchronously, even if it only contains synchronous steps. This is useful for consistent execution in mixed environments.allow_none: bool: Sets the default behavior for handlingNonefor the entire pipeline execution. If a step does not have its ownallow_nonesetting, this value is used. However, a step-specificallow_noneparameter will always override this global setting for that particular step. Defaults toFalse.
These parameters can be used with bind, abind, apply, aapply, ensure, and aensure to customize their behavior:
-
result_kw: str: Instead of passing the pipeline's current value as the first positional argument to the function, this parameter allows you to pass it as a keyword argument. This is useful for functions where you can't or don't want to change the signature.def process_data(data, config): return data + config # The pipeline value (10) will be passed as the 'data' argument. let(10).bind(process_data, result_kw="data", config=5).run() # Result: 15
-
is_async: bool: Explicitly tells the pipeline to treat a function as asynchronous, even if it's not a coroutine function defined withasync def. This is an advanced use case, typically for functions that return an awaitable. Defaults toFalse. -
allow_none: bool: Controls howNonevalues are handled.- If
False(the default), the pipeline will stop execution if a step returnsNone. - If
True,Noneis treated as a valid value and is passed to the next step in the pipeline.
- If
-
default: Any: Provides a fallback value.- In
bind: If the function's result isNone, the pipeline will continue with thisdefaultvalue instead of stopping. - In
ensure: If the predicate returnsFalse, the pipeline will continue with thisdefaultvalue.
- In
Contributions are welcome! If you have a suggestion or find a bug, please open an issue or submit a pull request.
- Fork the repository.
- Create a new branch (
git checkout -b feature/my-new-feature). - Make your changes.
- Commit your changes (
git commit -am 'Add some feature'). - Push to the branch (
git push origin feature/my-new-feature). - Create a new Pull Request.
This project is licensed under the MIT License - see the LICENSE file for details.