Observation History and Delay#
Observations have two temporal features: history and delay. History stacks past frames for temporal context, while delay can be used to model sensor latency.
TL;DR#
Add history to stack frames:
from mjlab.managers.observation_manager import ObservationTermCfg
joint_vel: ObservationTermCfg = ObservationTermCfg(
func=joint_vel,
history_length=5, # Keep last 5 frames
flatten_history_dim=True # Flatten for MLP: (12,) * 5 = (60,)
)
Add delay to model sensor latency:
# At 50Hz control (20ms/step): lag=2-3 → 40-60ms latency
camera: ObservationTermCfg = ObservationTermCfg(
func=camera_obs,
delay_min_lag=2,
delay_max_lag=3,
)
Combine both:
joint_pos: ObservationTermCfg = ObservationTermCfg(
func=joint_pos,
delay_min_lag=1,
delay_max_lag=3, # Delayed observations
history_length=5, # Stack 5 delayed frames
flatten_history_dim=True
)
# Pipeline: compute → delay → stack → flatten
Observation History#
History stacks past observations to provide temporal context.
Basic Usage#
Flattened history (for MLPs):
joint_vel: ObservationTermCfg = ObservationTermCfg(
func=joint_vel, # Returns (num_envs, 12)
history_length=3,
flatten_history_dim=True # Output: (num_envs, 36)
)
Structured history (for RNNs):
joint_vel: ObservationTermCfg = ObservationTermCfg(
func=joint_vel, # Returns (num_envs, 12)
history_length=3,
flatten_history_dim=False # Output: (num_envs, 3, 12)
)
Group-Level Override#
Apply history to all terms in a group:
@dataclass
class PolicyCfg(ObservationGroupCfg):
concatenate_terms: bool = True
history_length: int = 5 # Applied to all terms
flatten_history_dim: bool = True
joint_pos: ObservationTermCfg = ObservationTermCfg(func=joint_pos)
joint_vel: ObservationTermCfg = ObservationTermCfg(func=joint_vel)
# Both terms get 5-frame history, flattened
Term-level settings override group settings:
@dataclass
class PolicyCfg(ObservationGroupCfg):
history_length: int = 3 # Default for group
joint_pos: ObservationTermCfg = ObservationTermCfg(
func=joint_pos,
history_length=5 # Override: use 5 instead of 3
)
Reset Behavior#
History buffers are cleared on environment reset. The first observation after reset is backfilled across all history slots, ensuring valid data from step 0.
# At reset
buffer = [obs_0, obs_0, obs_0] # Backfilled
# After 2 steps
buffer = [obs_0, obs_1, obs_2] # Normal accumulation
History Flattening Order (Term-Major vs Time-Major)#
When flatten_history_dim=True and concatenate_terms=True, mjlab uses
term-major ordering, where each term’s full history is flattened before
concatenating terms:
Term A: shape (num_envs, obs_dim_A) with history_length=3
Term B: shape (num_envs, obs_dim_B) with history_length=3
mjlab output (TERM-MAJOR):
[A_t0, A_t1, A_t2, B_t0, B_t1, B_t2, ...]
└─ all A history ─┘ └─ all B history ─┘
An alternative approach is time-major (or frame-major) ordering, where complete observation frames are built at each timestep before concatenating across time:
TIME-MAJOR (alternative approach):
[A_t0, B_t0, ..., A_t1, B_t1, ..., A_t2, B_t2, ...]
└─ frame t0 ──┘ └─ frame t1 ──┘ └─ frame t2 ──┘
Sim2sim compatibility: If you need to transfer policies to/from frameworks that use time-major ordering, you will need to reorder observations. This affects policies trained with history but not those without.
Observation Delay#
Real robots have sensors with communication delays (WiFi, USB). The delay system models sensor latency by returning observations from earlier timesteps.
Delay Parameters#
delay_min_lag / delay_max_lag (default: 0) Lag range in steps. Uniformly
samples an integer lag from [min_lag, max_lag] (both inclusive).
lag=0 means current observation, lag=2 means 2 steps ago.
delay_per_env (default: True) If True, each environment gets a different
lag. If False, all environments share the same lag.
delay_hold_prob (default: 0.0)
Probability [0, 1] of keeping the previous lag instead of resampling.
delay_update_period (default: 0) How often (in steps) to resample the lag.
If 0, resample every step. If N > 0, the lag value stays constant for N steps
before being resampled (creates temporally correlated latency patterns).
delay_per_env_phase (default: True) If True and delay_update_period > 0,
stagger resample timing across environments with random phase offsets.
Note
delay_update_period controls how often the lag value is resampled, not
how often observations are refreshed. You still get a new (delayed) observation
every step - the lag just stays constant for N steps before being resampled.
Visualizing delay (50Hz control = 20ms/step):
Sensor captures: A B C D E F G H
↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓
Control steps: 0 1 2 3 4 5 6 7
20ms 40ms 60ms 80ms 100ms 120ms 140ms 160ms
No delay (baseline - perfect sensor):
You receive: A B C D E F G H
↑ current observation every step
Delay with lag=2:
You receive: A A A B C D E F
↑clamp↑ ↑ ↑ ↑ ↑ ↑ ↑
Steps 0-1: lag clamped (buffer not full yet)
Step 2+: 40ms delay, every step gets NEW observation
Example - Camera with 40-60ms latency at 50Hz control:
camera: ObservationTermCfg = ObservationTermCfg(
func=camera_obs,
delay_min_lag=2, # 40ms latency
delay_max_lag=3, # 60ms latency
)
Computing Delays from Real-World Latency#
Convert real-world latency to simulation steps:
delay_steps = latency_ms / (1000 / control_hz)
Example at 50Hz control (20ms per step): - 40ms latency = 40 / 20 = 2 steps - 60ms latency = 60 / 20 = 3 steps - 100ms latency = 100 / 20 = 5 steps
Example at 100Hz control (10ms per step): - 40ms latency = 40 / 10 = 4 steps - 60ms latency = 60 / 10 = 6 steps
Note
Delays are quantized to control timesteps. At 50Hz control (20ms/step),
you can only represent 0ms, 20ms, 40ms, 60ms, etc. To approximate a 45ms sensor,
use delay_min_lag=2, delay_max_lag=3 which uniformly samples lag ∈ {2, 3}
(both inclusive), giving either 40ms or 60ms delay.
Examples#
Joint encoders (no delay):
joint_pos: ObservationTermCfg = ObservationTermCfg(func=joint_pos)
# delay_min_lag=delay_max_lag=0 by default.
Camera with 40-60ms latency at 50Hz control:
# 40-60ms latency = 2-3 steps at 50Hz (20ms/step)
camera: ObservationTermCfg = ObservationTermCfg(
func=camera_obs,
delay_min_lag=2, # 40ms
delay_max_lag=3, # 60ms
)
Mixed system - fast encoders and slow camera:
@dataclass
class PolicyCfg(ObservationGroupCfg):
# Fast encoders (no delay)
joint_pos: ObservationTermCfg = ObservationTermCfg(
func=joint_pos,
)
# Camera with 40-80ms latency
camera: ObservationTermCfg = ObservationTermCfg(
func=camera_obs,
delay_min_lag=2, # 40ms
delay_max_lag=4, # 80ms
)
Processing Pipeline#
Observations flow through this pipeline:
compute → noise → clip → scale → delay → history → flatten
Why delay before history? History stacks delayed observations. This models real systems where you buffer old sensor readings, not future ones.
Example with both:
joint_vel: ObservationTermCfg = ObservationTermCfg(
func=joint_vel,
scale=0.1, # Scale raw values
delay_min_lag=1, # 20ms delay at 50Hz
delay_max_lag=2, # 40ms delay at 50Hz
history_length=3, # Stack 3 delayed frames
flatten_history_dim=True
)
# Pipeline:
# 1. compute() returns (num_envs, 12)
# 2. scale: multiply by 0.1
# 3. delay: return observation from 1-2 steps ago
# 4. history: stack last 3 delayed frames → (num_envs, 3, 12)
# 5. flatten: reshape → (num_envs, 36)
Performance#
Delay buffers are only created when delay_max_lag > 0. Terms with no delay
(the default) have zero overhead. Similarly, history buffers are only created
when history_length > 0.