Cleanup Jobs
In MAS, most of the data are initially only soft-deleted, by setting a deleted_at, finished_at, consumed_at timestamp on the row, instead of actually deleting the row.
They are kept around for a short period of time, for audit purposes or to help with the user experience in some case.
This document describes the cleanup jobs in MAS which delete those stale rows after some time, including how to add new cleanup jobs and understand the existing ones.
Cleanup Job Architecture
Cleanup jobs are scheduled tasks that hard-delete old data from the database. They follow a consistent pattern:
- Job struct in
crates/storage/src/queue/tasks.rs- Defines the job and queue name - Storage trait in
crates/storage/src/{domain}/- Declares the cleanup method interface - PostgreSQL implementation in
crates/storage-pg/src/{domain}/- Implements the actual cleanup logic - Job runner in
crates/tasks/src/database.rs- Implements theRunnableJobtrait with batching logic - Registration in
crates/tasks/src/lib.rs- Registers the handler and schedules execution
All Cleanup Jobs
| Job | Entity | Retention | Notes |
|---|---|---|---|
CleanupRevokedOAuthAccessTokensJob | oauth2_access_tokens | 1 hour after revoked_at | |
CleanupExpiredOAuthAccessTokensJob | oauth2_access_tokens | 30 days after expires_at | For idempotency |
CleanupRevokedOAuthRefreshTokensJob | oauth2_refresh_tokens | 1 hour after revoked_at | |
CleanupConsumedOAuthRefreshTokensJob | oauth2_refresh_tokens | 1 hour after consumed_at | |
CleanupUserRegistrationsJob | user_registrations | 30 days | For abuse investigation |
CleanupFinishedCompatSessionsJob | compat_sessions | 30 days after finished_at | Cascades to tokens |
CleanupFinishedOAuth2SessionsJob | oauth2_sessions | 30 days after finished_at | Cascades to tokens |
CleanupFinishedUserSessionsJob | user_sessions | 30 days after finished_at | Only if no child sessions |
CleanupOAuthAuthorizationGrantsJob | oauth2_authorization_grants | 7 days | |
CleanupOAuthDeviceCodeGrantsJob | oauth2_device_code_grant | 7 days | |
CleanupUserRecoverySessionsJob | user_recovery_sessions | 7 days | Codes expire in 10 min |
CleanupUserEmailAuthenticationsJob | user_email_authentications | 7 days | Codes expire in 10 min |
CleanupUpstreamOAuthSessionsJob | upstream_oauth_authorization_sessions | 7 days (orphaned) | Where user_session_id IS NULL |
CleanupUpstreamOAuthLinksJob | upstream_oauth_links | 7 days (orphaned) | Where user_id IS NULL |
CleanupInactiveOAuth2SessionIpsJob | oauth2_sessions.last_active_ip | 30 days | Clears out IPs after inactivity |
CleanupInactiveCompatSessionIpsJob | compat_sessions.last_active_ip | 30 days | Clears out IPs after inactivity |
CleanupInactiveUserSessionIpsJob | user_sessions.last_active_ip | 30 days | Clears out IPs after inactivity |
CleanupQueueJobsJob | queue_jobs | 30 days | Completed/failed jobs |
Session Cleanup and Backchannel Logout
The session cleanup jobs must preserve the dependency chain required for backchannel logout to work correctly.
Backchannel Logout Flow
When an upstream IdP sends a backchannel logout notification, MAS must trace through the session hierarchy to find and finish all related sessions:
Upstream IdP logout notification
│
▼
┌───────────────────────────────────────┐
│ upstream_oauth_authorization_sessions │
│ (matched by sub/sid claims) │
└──────────────┬────────────────────────┘
│ user_session_id
▼
┌─────────────────────────────────────┐
│ user_sessions │
│ (browser sessions) │
└──────────────┬──────────────────────┘
│ user_session_id FK
┌────┴──────────────┐
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ compat_sessions │ │ oauth2_sessions │
└─────────────────┘ └─────────────────┘
Cleanup Order
The cleanup jobs run in an order that respects this hierarchy:
- Compat sessions (
CleanupFinishedCompatSessionsJob)- Also deletes
compat_access_tokens,compat_refresh_tokens
- Also deletes
- OAuth2 sessions (
CleanupFinishedOAuth2SessionsJob)- Also deletes
oauth2_access_tokens,oauth2_refresh_tokens
- Also deletes
- User sessions (
CleanupFinishedUserSessionsJob)- Only deletes if NO
compat_sessionsoroauth2_sessionsreference it. This can make this job inefficient if there are lots of finisheduser_sessionsthat are still referenced by activecompat_sessionsoroauth2_sessions. - Also deletes
user_session_authentications - Cascades to
SET NULLtheuser_session_idonupstream_oauth_authorization_sessions
- Only deletes if NO
- Upstream OAuth authorization sessions (
CleanupUpstreamOAuthSessionsJob)- Only deletes if
user_session_idisNULL, so if the authentication session was never finished or the user session was cleaned up.
- Only deletes if
Why User Sessions Require Special Handling
The user_session_id foreign key links must be preserved for backchannel logout to work:
- Backchannel logout traces:
upstream_oauth_authorization_sessions→user_sessions→compat_sessions/oauth2_sessions - If
user_sessionsis deleted while child sessions exist, the link is broken and logout propagation fails - The
NOT EXISTSchecks in the cleanup query ensure we only deleteuser_sessionsafter all children are cleaned up - FK constraints (
ON DELETE NO ACTION) provide a safety net - attempting to delete a referenceduser_sessionwill fail
Adding a New Cleanup Job
1. Add Job Struct
In crates/storage/src/queue/tasks.rs:
#![allow(unused)]
fn main() {
/// Cleanup old foo records
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct CleanupFooJob;
impl InsertableJob for CleanupFooJob {
const QUEUE_NAME: &'static str = "cleanup-foo";
}
}
2. Add Storage Trait Method
In crates/storage/src/{domain}/foo.rs, add to the repository trait and repository_impl! macro:
#![allow(unused)]
fn main() {
async fn cleanup(
&mut self,
since: Option<DateTime<Utc>>,
until: DateTime<Utc>,
limit: usize,
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error>;
}
3. Implement in PostgreSQL
In crates/storage-pg/src/{domain}/foo.rs, use the CTE pattern:
#![allow(unused)]
fn main() {
async fn cleanup(
&mut self,
since: Option<DateTime<Utc>>,
until: DateTime<Utc>,
limit: usize,
) -> Result<(usize, Option<DateTime<Utc>>), Self::Error> {
let res = sqlx::query!(
r#"
WITH
to_delete AS (
SELECT id, timestamp_col
FROM table
WHERE timestamp_col IS NOT NULL
AND ($1::timestamptz IS NULL OR timestamp_col >= $1)
AND timestamp_col < $2
ORDER BY timestamp_col ASC
LIMIT $3
FOR UPDATE
),
deleted AS (
DELETE FROM table USING to_delete
WHERE table.id = to_delete.id
RETURNING timestamp_col
)
SELECT COUNT(*) as "count!", MAX(timestamp_col) as last_timestamp FROM deleted
"#,
since,
until,
limit as i64,
)
.traced()
.fetch_one(&mut *self.conn)
.await?;
Ok((
res.count.try_into().unwrap_or(usize::MAX),
res.last_timestamp,
))
}
}
4. Add Index Migration
Make sure to add an index on that timestamp column used by this cleanup job:
-- no-transaction
CREATE INDEX CONCURRENTLY IF NOT EXISTS "table_timestamp_idx"
ON "table" ("timestamp_col")
WHERE "timestamp_col" IS NOT NULL;
The partial index (WHERE timestamp_col IS NOT NULL) makes queries more efficient by only indexing rows that will actually be cleaned up.
5. Implement RunnableJob
In crates/tasks/src/database.rs:
#![allow(unused)]
fn main() {
#[async_trait]
impl RunnableJob for CleanupFooJob {
#[tracing::instrument(name = "job.cleanup_foo", skip_all)]
async fn run(&self, state: &State, context: JobContext) -> Result<(), JobError> {
// Cleanup records older than X days
let until = state.clock.now() - chrono::Duration::days(30);
let mut total = 0;
let mut since = None;
while !context.cancellation_token.is_cancelled() {
let mut repo = state.repository().await.map_err(JobError::retry)?;
let (count, last_timestamp) = repo
.foo()
.cleanup(since, until, BATCH_SIZE)
.await
.map_err(JobError::retry)?;
repo.save().await.map_err(JobError::retry)?;
since = last_timestamp;
total += count;
if count != BATCH_SIZE {
break;
}
}
if total == 0 {
debug!("no foo records to clean up");
} else {
info!(count = total, "cleaned up foo records");
}
Ok(())
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(10 * 60))
}
}
}
6. Register and Schedule
In crates/tasks/src/lib.rs:
#![allow(unused)]
fn main() {
// Add to register_handler chain
.register_handler::<mas_storage::queue::CleanupFooJob>()
// Add schedule
.add_schedule(
"cleanup-foo",
// Run this job every hour
"0 XX * * * *".parse()?, // Choose a minute offset
mas_storage::queue::CleanupFooJob,
)
}
Implementation Notes
Batching Pattern
All cleanup jobs use a batching pattern to avoid long-running transactions:
- Process records in batches (typically 1000 at a time)
- Use pagination cursor (
since) to track progress - Create a new transaction for each batch
- Check for cancellation between batches
- Log total count at the end
Retention Policies
Retention periods vary by use case:
- 1 hour: Revoked/consumed tokens (no longer useful)
- 7 days: Short-lived grants/codes (abuse investigation)
- 30 days: Sessions and registrations (longer audit trail)