Metadata-Version: 2.4
Name: ashredis
Version: 1.0.1
Summary: Async Redis ORM for Python
Author-email: Bloodycat <mrblooddycat@gmail.com>
Keywords: redis,orm,async,asyncio,hash
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: redis>=5.0.8

# ashredis - Async Redis ORM for Python
[![PyPI version](https://img.shields.io/pypi/v/ashredis.svg)](https://pypi.org/project/ashredis/)
[![Python versions](https://img.shields.io/pypi/pyversions/ashredis.svg)](https://pypi.org/project/ashredis/)

**ashredis** is an asynchronous Redis Object-Relational Mapping (ORM) library for Python that provides a simple way to interact with Redis using Python objects.
## Features
- Fully asynchronous using `redis.asyncio`
- Type annotations for all fields
- Automatic serialization/deserialization
- TTL (Time-To-Live) support
- Querying and sorting capabilities
- Redis Stream integration
## Installation
```bash
pip install ashredis
```
## Basic Usage
```py 
import asyncio
import time
from datetime import timedelta
from ashredis.model import RedisObject

# Define your Redis-backed model
class User(RedisObject):
    username: str
    email: str
    last_login: int  # timestamp in milliseconds
    
    __category__ = "user"  # Redis key prefix
    __default_key__ = "default_user"  # Default key if not specified

async def main():
    # Create and save a user
    user = User(key="user123")
    user.username = "johndoe"
    user.email = "john@example.com"
    user.last_login = int(time.time() * 1000)
    
    # Save with 1 hour TTL
    await user.save(ttl=timedelta(hours=1))
    
    # Load the user
    await user.load()
    print(user.username)  # "johndoe"
    
    # Context manager usage (auto-closes connection)
    async with User(key="user456") as new_user:
        new_user.username = "janedoe"
        await new_user.save()
        await new_user.load()
        print(new_user.username)  # "janedoe"

asyncio.run(main())
```
# Comprehensive Guide
## Model Definition
### Define your Redis-backed models by subclassing RedisObject:
```py 
class Product(RedisObject):
    name: str
    price: float
    stock: int
    tags: list  # automatically serialized to JSON
    metadata: dict   # automatically serialized to JSON
    
    __category__ = "product"
```
## Connection Management
### Initialize with Redis connection parameters:
```py 
class MyModel(RedisObject):
    def __init__(self, key: str | int = None, path: list[str] = None):
        super().__init__(
            host="localhost",
            port=6379,
            password="your_password",
            db=0,
            key=key,
            path=path
        )
```
### Or use context manager for automatic connection handling:
```py 
async with Product(key="prod123") as product:
    await product.load()
```
## CRUD Operations
### Create/Save
```py 
product = Product(key="prod123")
product.name = "Laptop"
product.price = 999.99
product.stock = 10
product.tags = ["electronics", "computers"]
await product.save()

# With TTL (Time-To-Live)
await product.save(ttl=timedelta(days=1))

# With stream event
await product.save(stream=True)
```
### Read/Load
```py 
# Load by key
await product.load(key="prod123")

# Check if exists
exists = await product.load()
if exists:
    print("Product exists!")
```
### Update
```py 
# Load by key
await product.load(key="prod123")
product.price = 899.99  # Update price
await product.save()
```
### Delete
```py 
await product.delete(key="prod123")
```
## Querying
### Load All Records
```py 
all_products = await Product().load_all()
```
### With Pagination
```py 
# Get first 10 products
products = await Product().load_all(limit=10)

# Get next 10 products
products = await Product().load_all(offset=10, limit=10)
```
### Load Sorted
```py 
# Sort by price (ascending)
products = await Product().load_sorted("price")

# Sort by price (descending)
products = await Product().load_sorted("price", reverse_sorted=True)
```
### Time-based Queries
```py 
# Get products updated in last 24 hours
recent_products = await Product().load_for_time(
    ts_field="last_updated",
    time_range=timedelta(days=1)
)

# Get sorted by timestamp
recent_products = await Product().load_for_time(
    ts_field="last_updated",
    time_range=timedelta(days=1),
    sort=True
)
```
## Advanced Features
### Hierarchical Keys
```py 
# Creates key like "product:category:electronics:123"
product = Product(key="123", path=["category", "electronics"])
```
### Stream Integration
```py 
# Save with stream event
await product.save(stream=True)

# Get stream events
events = await product.get_stream_in_interval(
    start_ts=int(time.time() * 1000) - 3600000,  # 1 hour ago
    end_ts=int(time.time() * 1000)               # now
)

# Listen for stream events
async def callback(event):
    print("New event:", event.get_dict())

await product.listen_for_stream(callback)
```
### TTL Management
```py 
# Set TTL
await product.save(ttl=timedelta(hours=2))

# Get remaining TTL
ttl_seconds = await product.get_ttl()
```
