Skip to content

Utilities API

reaktiv.untracked(func_or_signal=None)

Execute a function without establishing dependencies.

This utility prevents dependency tracking, useful when you want to read signal values without creating reactive relationships. Use as a context manager (recommended) or as a function wrapper.

Parameters:

Name Type Description Default
func_or_signal Union[Callable[[], T], object, None]

Optional function to execute, or None for context manager usage

None

Returns:

Type Description
Union[T, ContextManager[None]]

The result of the function if provided, or a context manager if None

Examples:

As a context manager (recommended):

from reaktiv import Signal, Effect, untracked

count = Signal(0)
other = Signal(100)

def my_effect():
    # Tracked read
    c = count()

    # Untracked reads within this block
    with untracked():
        o = other()

    print(f"Count: {c}, Other: {o}")

# Keep reference to prevent GC
effect = Effect(my_effect)
# Prints: "Count: 0, Other: 100"

other.set(200)
# No print - 'other' was read in untracked context

As a function wrapper:

from reaktiv import Signal, Effect, untracked

count = Signal(0)
other = Signal(100)

def my_effect():
    # This creates a dependency
    c = count()

    # This does NOT create a dependency
    o = untracked(lambda: other())

    print(f"Count: {c}, Other: {o}")

# Keep reference to prevent GC
effect = Effect(my_effect)
# Prints: "Count: 0, Other: 100"

count.set(1)
# Prints: "Count: 1, Other: 100" (effect re-runs)

other.set(200)
# No print - effect doesn't depend on 'other'

reaktiv.batch()

Batch multiple signal updates to prevent redundant effect executions.

When you update multiple signals, each change normally triggers effects immediately. Using batch() defers all effect executions until the batch completes, ensuring each effect runs only once with the final values.

Yields:

Type Description
None

None

Examples:

Without batching:

from reaktiv import Signal, Effect

x = Signal(1)
y = Signal(2)

def log_values():
    print(f"x: {x()}, y: {y()}")

Effect(log_values)
# Prints: "x: 1, y: 2"

x.set(10)  # Effect runs
# Prints: "x: 10, y: 2"

y.set(20)  # Effect runs again
# Prints: "x: 10, y: 20"

With batching:

from reaktiv import Signal, Effect, batch

x = Signal(1)
y = Signal(2)

def log_values():
    print(f"x: {x()}, y: {y()}")

Effect(log_values)
# Prints: "x: 1, y: 2"

with batch():
    x.set(10)  # No effect execution yet
    y.set(20)  # No effect execution yet
# After batch completes: Effect runs once
# Prints: "x: 10, y: 20"

Multiple updates:

from reaktiv import Signal, Effect, batch

items = Signal([])

def log_count():
    print(f"Items: {len(items())}")

Effect(log_count)
# Prints: "Items: 0"

# Without batch, effect would run 3 times
with batch():
    items.set([1])
    items.set([1, 2])
    items.set([1, 2, 3])
# Effect runs once with final value
# Prints: "Items: 3"

Nested batches:

from reaktiv import Signal, Effect, batch

x = Signal(0)
effect = Effect(lambda: print(f"x={x()}"))  # retain reference
# Prints: "x=0"

with batch():
    x.set(1)
    with batch():
        x.set(2)
        x.set(3)
    x.set(4)
# Effect runs once after outermost batch
# Prints: "x=4"

reaktiv.to_async_iter(signal, initial=True) async

Convert a signal to an async iterator that yields on each change.

This utility allows you to use signals with Python's async iteration syntax, making it easy to integrate reaktiv with async code and frameworks.

Parameters:

Name Type Description Default
signal ReadableSignal[T]

The signal to convert into an async iterator

required
initial bool

Whether to yield the current value immediately (True) or only yield on changes (False)

True

Yields:

Type Description
AsyncIterator[T]

The signal's value each time it changes

Examples:

Basic usage:

import asyncio
from reaktiv import Signal, to_async_iter

counter = Signal(0)

async def watch_counter():
    async for value in to_async_iter(counter):
        print(f"Counter: {value}")
        if value >= 3:
            break

async def main():
    # Start watching
    task = asyncio.create_task(watch_counter())

    # Make changes
    await asyncio.sleep(0.1)
    counter.set(1)
    await asyncio.sleep(0.1)
    counter.set(2)
    await asyncio.sleep(0.1)
    counter.set(3)

    await task

asyncio.run(main())
# Prints:
# Counter: 0
# Counter: 1
# Counter: 2
# Counter: 3

Skip initial value:

import asyncio
from reaktiv import Signal, to_async_iter

status = Signal("idle")

async def watch_status():
    # Only yield on changes, not initial value
    async for value in to_async_iter(status, initial=False):
        print(f"Status changed to: {value}")
        if value == "done":
            break

async def main():
    task = asyncio.create_task(watch_status())

    await asyncio.sleep(0.1)
    status.set("loading")  # Prints: "Status changed to: loading"
    await asyncio.sleep(0.1)
    status.set("done")     # Prints: "Status changed to: done"

    await task

asyncio.run(main())

Integration with async frameworks:

import asyncio
from reaktiv import Signal, to_async_iter

data_signal = Signal(None)

async def process_data_stream():
    async for data in to_async_iter(data_signal):
        if data is not None:
            # Process data
            await send_to_api(data)

async def main():
    processor = asyncio.create_task(process_data_stream())

    # Simulate data updates
    data_signal.set({"id": 1})
    await asyncio.sleep(0.1)
    data_signal.set({"id": 2})

    # ... continue processing