Logs (Time-Series & Stream)
The LogManager is a specialized data structure for Time-Series Data. Unlike a dictionary (where keys are unique) or a list (ordered by insertion), a Log is ordered by Timestamp.
This is ideal for:
- System Telemetry: CPU usage, memory stats, sensor readings.
- Audit Trails: “User X changed setting Y at Time Z”.
- Chat History: Storing messages chronologically.
- Event Sourcing: Storing a sequence of state-changing events.
Quick Start
Initialize a log using db.log().
from beaver import BeaverDB
from datetime import datetime, timedelta
db = BeaverDB("telemetry.db")
metrics = db.log("server_metrics")
# 1. Log Events
# BeaverDB automatically records the current UTC timestamp.
metrics.log({"cpu": 12.5, "ram": 64})
metrics.log({"cpu": 45.0, "ram": 70})
# 2. Log with explicit timestamp
# You can backfill data by providing a datetime object.
yesterday = datetime.now() - timedelta(days=1)
metrics.log({"cpu": 10.0, "ram": 60}, timestamp=yesterday)
# 3. Iterate (Chronological Order)
for timestamp, data in metrics:
print(f"[{timestamp}] CPU: {data['cpu']}%")Reading Data
Time Range Queries
The most common operation on logs is fetching data for a specific window (e.g., “Show me errors from the last hour”).
now = datetime.now()
one_hour_ago = now - timedelta(hours=1)
# Efficiently queries the index for this range
recent_logs = metrics.range(start=one_hour_ago, end=now)
print(f"Found {len(recent_logs)} entries in the last hour.")Full Iteration
You can iterate over the entire history of the log.
for ts, entry in metrics:
process(entry)Real-Time Analysis (Live Views)
The “Killer Feature” of the LogManager is the .live() iterator. It allows you to build Real-Time Dashboards or Monitoring Alerts without writing complex polling loops or maintaining state.
It works by maintaining a Rolling Window over the most recent data and applying an Aggregator Function to it periodically.
How it Works
- Window: “Look at the last 5 minutes of data.”
- Period: “Update the result every 1 second.”
- Aggregator: “Calculate the average CPU usage.”
import statistics
# Define how to summarize the window
def calculate_avg_cpu(entries: list[dict]) -> float:
if not entries:
return 0.0
values = [e["cpu"] for e in entries]
return statistics.mean(values)
# Create the live view
# This returns an infinite iterator that blocks until the next period
live_view = metrics.live(
window=timedelta(minutes=5), # 5-minute rolling window
period=timedelta(seconds=1), # Yield a new result every second
aggregator=calculate_avg_cpu
)
print("Starting Live Dashboard...")
for avg_cpu in live_view:
print(f"Live CPU (5min avg): {avg_cpu:.2f}%")
# This loop runs forever. Press Ctrl+C to stop.Async Support
For modern web applications (FastAPI, etc.), you can use the async version to stream updates via WebSockets/SSE.
async_logs = db.log("metrics").as_async()
async for avg_cpu in async_logs.live(..., aggregator=calculate_avg_cpu):
await websocket.send_json({"cpu": avg_cpu})Maintenance
Batching
Just like other managers, use .batched() when ingesting high-frequency logs (e.g., 1000 requests/sec) to prevent database lock contention.
# Ingest a batch of 1,000 sensor readings
with metrics.batched() as batch:
for reading in sensor_stream:
batch.log(reading)Clearing
To rotate logs or wipe history:
metrics.clear()