Cortex Aggregation Engine
Cortex provides a Rust-accelerated aggregation engine for Brainy, enabling real-time analytics on entity data with incremental updates, parallel rebuilds, and precise statistical operations.
Architecture
Brainy owns the storage and lifecycle. Cortex owns the compute.
┌─────────────────────────────────────────┐
│ Brainy │
│ ┌─────────────────────────────────┐ │
│ │ AggregationIndex │ │
│ │ ├── defineAggregate() │ │
│ │ ├── removeAggregate() │ │
│ │ ├── onEntityAdd/Update/Delete │ │
│ │ └── query() │ │
│ └──────────────┬──────────────────┘ │
│ │ provider interface │
│ ┌──────────────▼──────────────────┐ │
│ │ Cortex AggregationProvider │ │
│ │ ├── NativeAggregationEngine │───── Rust (NAPI)
│ │ ├── BTreeMap MIN/MAX │ │
│ │ ├── Rayon parallel rebuild │ │
│ │ └── Welford's online stddev │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘When Cortex is installed as a Brainy plugin, the aggregation provider automatically registers. All aggregation computation runs in Rust through NAPI bindings while Brainy handles storage, persistence, and the public API.
Operations
Cortex supports all 7 aggregation operations:
| Operation | Description | Precision |
|---|---|---|
sum |
Running total of a numeric field | Exact (f64) |
count |
Number of matching entities | Exact (u64) |
avg |
Running average (sum/count) | Exact (f64) |
min |
Minimum value across all entities | Exact (BTreeMap) |
max |
Maximum value across all entities | Exact (BTreeMap) |
stddev |
Sample standard deviation | Online (Welford's) |
variance |
Sample variance | Online (Welford's) |
BTreeMap Precise MIN/MAX
Unlike simpler implementations that become stale after deletes, Cortex uses a Rust BTreeMap<OrderedFloat<f64>, u64> to track the exact frequency of every value:
- Add: Insert or increment the count for the value
- Delete: Decrement the count; remove the key if count reaches zero
- Query:
BTreeMap::first_key()for MIN,BTreeMap::last_key()for MAX
This guarantees precise MIN/MAX after any sequence of add/update/delete operations without requiring a full rescan.
Welford's Online Algorithm
Standard deviation and variance use Welford's numerically-stable online algorithm with M2 tracking (sum of squared differences from the running mean). This computes incrementally without storing all values:
On add(x):
n += 1
delta = x - mean
mean += delta / n
delta2 = x - mean
M2 += delta * delta2
Sample variance = M2 / (n - 1)
Sample stddev = sqrt(variance)Rayon Parallel Rebuild
When rebuilding an aggregate from scratch (e.g., after definition change or cold start), Cortex uses Rayon's parallel iterators to process entities across all CPU cores:
- Entities below 1,000 are processed sequentially (overhead not worth it)
- Above 1,000, Rayon splits the work across threads
- Each thread computes partial aggregation state
- Results are merged with thread-safe combining
For 100K entities with 20 groups, rebuild completes in ~15ms.
Time Window Bucketing
GroupBy dimensions can specify time windows for temporal aggregation. The native engine performs integer-based timestamp bucketing without allocating Date objects:
| Window | Format | Example |
|---|---|---|
hour |
YYYY-MM-DD-HH |
2024-01-15-14 |
day |
YYYY-MM-DD |
2024-01-15 |
week |
YYYY-WNN |
2024-W03 |
month |
YYYY-MM |
2024-01 |
quarter |
YYYY-QN |
2024-Q1 |
year |
YYYY |
2024 |
Combined groupBy dimensions (plain field + time window) produce composite group keys:
brain.defineAggregate({
name: 'monthly_sales',
source: { type: 'Event' },
groupBy: ['region', { field: 'date', window: 'month' }],
metrics: {
revenue: { op: 'sum', field: 'amount' },
count: { op: 'count' }
}
})This produces groups like { region: 'US', date: '2024-01' }.
State Serialization
The engine serializes all internal state (definitions, group states, BTreeMap contents, Welford's M2 values) to JSON for persistence:
// Brainy handles this automatically via the provider interface
const state = engine.serializeState() // JSON string
engine.restoreState(state) // Restore on restartState includes:
- All registered definitions
- Per-aggregate, per-group metric state
- BTreeMap contents for MIN/MAX
- Welford's
mean,M2, andcountfor stddev/variance
Source Filtering
Aggregate definitions can specify source filters to only aggregate entities of a specific type:
{
"name": "event_stats",
"source": { "type": "Event" },
"groupBy": ["category"],
"metrics": { "count": { "op": "count" } }
}During incremental updates, entities that don't match the source filter are skipped. During rebuild, the filter is compiled and applied before aggregation.
Aggregate entities themselves (entities with service: 'brainy:aggregation' or metadata.__aggregate: true) are always skipped to prevent infinite feedback loops.
Incremental Update Flow
When Brainy calls incrementalUpdate():
- Source filter check — skip if entity doesn't match
- Aggregate entity check — skip if entity is itself an aggregate
- Group key computation — extract groupBy fields, apply time bucketing
- Metric update — for each metric in the definition:
add: Increment sum/count/mean/M2, insert into BTreeMapdelete: Decrement sum/count/mean/M2, remove from BTreeMapupdate: Delete old values, add new values (handles group changes)
Performance
Run the included benchmarks: npm run bench
| Operation | Throughput | Latency |
|---|---|---|
incrementalUpdate (1K entities) |
809 ops/s | 1.2 ms |
rebuildAggregate (10K entities) |
475 ops/s | 2.1 ms |
rebuildAggregate (100K entities) |
66 ops/s | 15.2 ms |
queryAggregate (1K groups, sort + paginate) |
986 ops/s | 1.0 ms |
computeGroupKey (10K entities) |
146 ops/s | 6.8 ms |
Troubleshooting
Aggregation not using native engine
Verify Cortex is loaded and the aggregation provider is registered:
const diag = brain.diagnostics()
console.log(diag.providers.aggregation)
// Should show { source: 'plugin' }Stale MIN/MAX after deletes
This should not happen with Cortex — the BTreeMap guarantees precision. If you see stale values, verify you're running Cortex (not the JS fallback) and that the delete operation includes the correct entity metadata.
Rebuild performance
For datasets over 100K entities, rebuild uses Rayon parallelism automatically. Ensure your system has multiple CPU cores available. Single-core environments still work but won't benefit from parallel rebuild.