Utilities API¶
reaktiv.untracked(func_or_signal=None)
¶
Execute a function without establishing dependencies.
This utility prevents dependency tracking, useful when you want to read signal values without creating reactive relationships. Use as a context manager (recommended) or as a function wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func_or_signal
|
Union[Callable[[], T], object, None]
|
Optional function to execute, or None for context manager usage |
None
|
Returns:
| Type | Description |
|---|---|
Union[T, ContextManager[None]]
|
The result of the function if provided, or a context manager if None |
Examples:
As a context manager (recommended):
from reaktiv import Signal, Effect, untracked
count = Signal(0)
other = Signal(100)
def my_effect():
# Tracked read
c = count()
# Untracked reads within this block
with untracked():
o = other()
print(f"Count: {c}, Other: {o}")
# Keep reference to prevent GC
effect = Effect(my_effect)
# Prints: "Count: 0, Other: 100"
other.set(200)
# No print - 'other' was read in untracked context
As a function wrapper:
from reaktiv import Signal, Effect, untracked
count = Signal(0)
other = Signal(100)
def my_effect():
# This creates a dependency
c = count()
# This does NOT create a dependency
o = untracked(lambda: other())
print(f"Count: {c}, Other: {o}")
# Keep reference to prevent GC
effect = Effect(my_effect)
# Prints: "Count: 0, Other: 100"
count.set(1)
# Prints: "Count: 1, Other: 100" (effect re-runs)
other.set(200)
# No print - effect doesn't depend on 'other'
reaktiv.batch()
¶
Batch multiple signal updates to prevent redundant effect executions.
When you update multiple signals, each change normally triggers effects immediately.
Using batch() defers all effect executions until the batch completes, ensuring
each effect runs only once with the final values.
Yields:
| Type | Description |
|---|---|
None
|
None |
Examples:
Without batching:
from reaktiv import Signal, Effect
x = Signal(1)
y = Signal(2)
def log_values():
print(f"x: {x()}, y: {y()}")
Effect(log_values)
# Prints: "x: 1, y: 2"
x.set(10) # Effect runs
# Prints: "x: 10, y: 2"
y.set(20) # Effect runs again
# Prints: "x: 10, y: 20"
With batching:
from reaktiv import Signal, Effect, batch
x = Signal(1)
y = Signal(2)
def log_values():
print(f"x: {x()}, y: {y()}")
Effect(log_values)
# Prints: "x: 1, y: 2"
with batch():
x.set(10) # No effect execution yet
y.set(20) # No effect execution yet
# After batch completes: Effect runs once
# Prints: "x: 10, y: 20"
Multiple updates:
from reaktiv import Signal, Effect, batch
items = Signal([])
def log_count():
print(f"Items: {len(items())}")
Effect(log_count)
# Prints: "Items: 0"
# Without batch, effect would run 3 times
with batch():
items.set([1])
items.set([1, 2])
items.set([1, 2, 3])
# Effect runs once with final value
# Prints: "Items: 3"
Nested batches:
reaktiv.to_async_iter(signal, initial=True)
async
¶
Convert a signal to an async iterator that yields on each change.
This utility allows you to use signals with Python's async iteration syntax, making it easy to integrate reaktiv with async code and frameworks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
signal
|
ReadableSignal[T]
|
The signal to convert into an async iterator |
required |
initial
|
bool
|
Whether to yield the current value immediately (True) or only yield on changes (False) |
True
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[T]
|
The signal's value each time it changes |
Examples:
Basic usage:
import asyncio
from reaktiv import Signal, to_async_iter
counter = Signal(0)
async def watch_counter():
async for value in to_async_iter(counter):
print(f"Counter: {value}")
if value >= 3:
break
async def main():
# Start watching
task = asyncio.create_task(watch_counter())
# Make changes
await asyncio.sleep(0.1)
counter.set(1)
await asyncio.sleep(0.1)
counter.set(2)
await asyncio.sleep(0.1)
counter.set(3)
await task
asyncio.run(main())
# Prints:
# Counter: 0
# Counter: 1
# Counter: 2
# Counter: 3
Skip initial value:
import asyncio
from reaktiv import Signal, to_async_iter
status = Signal("idle")
async def watch_status():
# Only yield on changes, not initial value
async for value in to_async_iter(status, initial=False):
print(f"Status changed to: {value}")
if value == "done":
break
async def main():
task = asyncio.create_task(watch_status())
await asyncio.sleep(0.1)
status.set("loading") # Prints: "Status changed to: loading"
await asyncio.sleep(0.1)
status.set("done") # Prints: "Status changed to: done"
await task
asyncio.run(main())
Integration with async frameworks:
import asyncio
from reaktiv import Signal, to_async_iter
data_signal = Signal(None)
async def process_data_stream():
async for data in to_async_iter(data_signal):
if data is not None:
# Process data
await send_to_api(data)
async def main():
processor = asyncio.create_task(process_data_stream())
# Simulate data updates
data_signal.set({"id": 1})
await asyncio.sleep(0.1)
data_signal.set({"id": 2})
# ... continue processing