Cortex Aggregation Engine
Cortex provides a Rust-accelerated aggregation engine for Brainy, enabling real-time analytics on entity data with incremental updates, parallel rebuilds, and precise statistical operations.
Architecture
Brainy owns the storage and lifecycle. Cortex owns the compute.
┌─────────────────────────────────────────┐
│ Brainy │
│ ┌─────────────────────────────────┐ │
│ │ AggregationIndex │ │
│ │ ├── defineAggregate() │ │
│ │ ├── removeAggregate() │ │
│ │ ├── onEntityAdd/Update/Delete │ │
│ │ └── query() │ │
│ └──────────────┬──────────────────┘ │
│ │ provider interface │
│ ┌──────────────▼──────────────────┐ │
│ │ Cortex AggregationProvider │ │
│ │ ├── NativeAggregationEngine │───── Rust (NAPI)
│ │ ├── value multiset (min/max, │ │
│ │ │ percentile, distinctCount)│ │
│ │ ├── HAVING + array-unnest │ │
│ │ ├── Rayon parallel rebuild │ │
│ │ └── Welford's online stddev │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘When Cortex is installed as a Brainy plugin, the aggregation provider automatically registers. All aggregation computation runs in Rust through NAPI bindings while Brainy handles storage, persistence, and the public API.
Operations
Cortex supports all 9 aggregation operations:
| Operation | Description | Precision |
|---|---|---|
sum |
Running total of a numeric field | Exact (f64) |
count |
Number of matching entities | Exact (u64) |
avg |
Running average (sum/count) | Exact (f64) |
min |
Minimum value across all entities | Exact (value multiset) |
max |
Maximum value across all entities | Exact (value multiset) |
stddev |
Sample standard deviation | Online (Welford's) |
variance |
Sample variance | Online (Welford's) |
percentile |
Value at fraction p (e.g. p50, p95) |
Exact (value multiset, linear interpolation) |
distinctCount |
Number of distinct values | Exact (value multiset) |
percentile requires a p property in [0, 1] on the metric definition
({ op: 'percentile', field: 'latency', p: 0.95 }). Both percentile and
distinctCount are exact — they read the same per-group value multiset that
backs precise MIN/MAX, so they are never approximate and never stale after
deletes. Percentile uses standard linear interpolation between ranks, verified
against numpy's percentiles in src/aggregation/aggregation.test.ts.
Exact value multiset (MIN/MAX, percentile, distinctCount)
Unlike simpler implementations that become stale after deletes, Cortex keeps a per-group Rust BTreeMap<OrderedFloat<f64>, u32> (a sorted value multiset) tracking the exact frequency of every value:
- Add: Insert or increment the count for the value
- Delete: Decrement the count; remove the key if count reaches zero
- MIN / MAX: first / last key of the multiset
- percentile: walk the sorted multiset to the target rank, linear-interpolating between neighbours
- distinctCount: number of keys in the multiset
This single structure guarantees exact MIN, MAX, percentile, and distinct-count after any sequence of add/update/delete operations without requiring a full rescan.
Welford's Online Algorithm
Standard deviation and variance use Welford's numerically-stable online algorithm with M2 tracking (sum of squared differences from the running mean). This computes incrementally without storing all values:
On add(x):
n += 1
delta = x - mean
mean += delta / n
delta2 = x - mean
M2 += delta * delta2
Sample variance = M2 / (n - 1)
Sample stddev = sqrt(variance)Rayon Parallel Rebuild
When rebuilding an aggregate from scratch (e.g., after definition change or cold start), Cortex uses Rayon's parallel iterators to process entities across all CPU cores:
- Entities below 1,000 are processed sequentially (overhead not worth it)
- Above 1,000, Rayon splits the work across threads
- Each thread computes partial aggregation state
- Results are merged with thread-safe combining
For 100K entities with 20 groups, rebuild completes in ~15ms.
Time Window Bucketing
GroupBy dimensions can specify time windows for temporal aggregation. The native engine performs integer-based timestamp bucketing without allocating Date objects:
| Window | Format | Example |
|---|---|---|
hour |
YYYY-MM-DDTHH |
2024-01-15T14 |
day |
YYYY-MM-DD |
2024-01-15 |
week |
YYYY-WNN (ISO week) |
2024-W03 |
month |
YYYY-MM |
2024-01 |
quarter |
YYYY-QN |
2024-Q1 |
year |
YYYY |
2024 |
| custom (N seconds) | ISO 8601 UTC, floored | 2024-01-15T10:30:00.000Z |
Keys are computed with pure integer arithmetic (no Date allocation) and are byte-for-byte identical to Brainy's bucketTimestamp() (native/src/aggregation/timestamp.rs).
Combined groupBy dimensions (plain field + time window) produce composite group keys:
brain.defineAggregate({
name: 'monthly_sales',
source: { type: 'Event' },
groupBy: ['region', { field: 'date', window: 'month' }],
metrics: {
revenue: { op: 'sum', field: 'amount' },
count: { op: 'count' }
}
})This produces groups like { region: 'US', date: '2024-01' }.
Array-Unnest GroupBy
A groupBy dimension can unnest an array field, so an entity contributes once per
distinct array element. The classic use is tag/label frequency:
brain.defineAggregate({
name: 'tag_frequency',
source: { type: 'Post' },
groupBy: [{ field: 'tags', unnest: true }],
metrics: { count: { op: 'count' } }
})A post with tags: ['rust', 'db', 'rust'] contributes once to rust and once to
db (duplicates within one entity are de-duplicated). An entity with a missing or
empty array contributes to no group.
HAVING
queryAggregate can filter groups after metrics are computed via a having
clause, using the same Brainy field-operator syntax as where (greaterThan,
lessThan, between, anyOf/allOf/not, …). It applies to computed metric
values and to count:
brain.queryAggregate('monthly_sales', {
having: { revenue: { greaterThan: 10000 } }
})Filtering happens post-aggregation in Rust (O(groups)), so a having clause never
re-scans entities.
State Serialization
The engine serializes all internal state (definitions, group states, the per-group value multiset, Welford's M2 values) to JSON for persistence:
// Brainy handles this automatically via the provider interface
const state = engine.serializeState() // JSON string
engine.restoreState(state) // Restore on restartState includes:
- All registered definitions
- Per-aggregate, per-group metric state
- The value multiset backing MIN/MAX, percentile, and distinctCount
- Welford's
mean,M2, andcountfor stddev/variance
Source Filtering
Aggregate definitions can specify source filters to only aggregate entities of a specific type:
{
"name": "event_stats",
"source": { "type": "Event" },
"groupBy": ["category"],
"metrics": { "count": { "op": "count" } }
}During incremental updates, entities that don't match the source filter are skipped. During rebuild, the filter is compiled and applied before aggregation.
Aggregate entities themselves (entities with service: 'brainy:aggregation' or metadata.__aggregate: true) are always skipped to prevent infinite feedback loops.
Incremental Update Flow
When Brainy calls incrementalUpdate():
- Source filter check — skip if entity doesn't match
- Aggregate entity check — skip if entity is itself an aggregate
- Group key computation — extract groupBy fields, apply time bucketing
- Metric update — for each metric in the definition:
add: Increment sum/count/mean/M2, insert into BTreeMapdelete: Decrement sum/count/mean/M2, remove from BTreeMapupdate: Delete old values, add new values (handles group changes)
Performance
Run the included benchmarks: npm run bench
| Operation | Throughput | Latency |
|---|---|---|
incrementalUpdate (1K entities) |
809 ops/s | 1.2 ms |
rebuildAggregate (10K entities) |
475 ops/s | 2.1 ms |
rebuildAggregate (100K entities) |
66 ops/s | 15.2 ms |
queryAggregate (1K groups, sort + paginate) |
986 ops/s | 1.0 ms |
computeGroupKey (10K entities) |
146 ops/s | 6.8 ms |
Troubleshooting
Aggregation not using native engine
Verify Cortex is loaded and the aggregation provider is registered:
const diag = brain.diagnostics()
console.log(diag.providers.aggregation)
// Should show { source: 'plugin' }Stale MIN/MAX after deletes
This should not happen with Cortex — the BTreeMap guarantees precision. If you see stale values, verify you're running Cortex (not the JS fallback) and that the delete operation includes the correct entity metadata.
Rebuild performance
For datasets over 100K entities, rebuild uses Rayon parallelism automatically. Ensure your system has multiple CPU cores available. Single-core environments still work but won't benefit from parallel rebuild.