mas_storage/queue/job.rs
1// Copyright 2025, 2026 Element Creations Ltd.
2// Copyright 2024, 2025 New Vector Ltd.
3//
4// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
5// Please see LICENSE files in the repository root for full details.
6
7//! Repository to interact with jobs in the job queue
8
9use async_trait::async_trait;
10use chrono::{DateTime, Duration, Utc};
11use mas_data_model::Clock;
12use opentelemetry::trace::TraceContextExt;
13use rand_core::RngCore;
14use serde::{Deserialize, Serialize};
15use tracing_opentelemetry::OpenTelemetrySpanExt;
16use ulid::Ulid;
17
18use super::Worker;
19use crate::repository_impl;
20
21/// Represents a job in the job queue
22pub struct Job {
23 /// The ID of the job
24 pub id: Ulid,
25
26 /// The queue on which the job was placed
27 pub queue_name: String,
28
29 /// The payload of the job
30 pub payload: serde_json::Value,
31
32 /// Arbitrary metadata about the job
33 pub metadata: JobMetadata,
34
35 /// Which attempt it is
36 pub attempt: usize,
37}
38
39/// Metadata stored alongside the job
40#[derive(Serialize, Deserialize, Default, Clone, Debug)]
41pub struct JobMetadata {
42 #[serde(default)]
43 trace_id: String,
44
45 #[serde(default)]
46 span_id: String,
47
48 #[serde(default)]
49 trace_flags: u8,
50}
51
52impl JobMetadata {
53 fn new(span_context: &opentelemetry::trace::SpanContext) -> Self {
54 Self {
55 trace_id: span_context.trace_id().to_string(),
56 span_id: span_context.span_id().to_string(),
57 trace_flags: span_context.trace_flags().to_u8(),
58 }
59 }
60
61 /// Get the [`opentelemetry::trace::SpanContext`] from this [`JobMetadata`]
62 #[must_use]
63 pub fn span_context(&self) -> opentelemetry::trace::SpanContext {
64 use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
65 SpanContext::new(
66 TraceId::from_hex(&self.trace_id).unwrap_or(TraceId::INVALID),
67 SpanId::from_hex(&self.span_id).unwrap_or(SpanId::INVALID),
68 TraceFlags::new(self.trace_flags),
69 // Trace context is remote, as it comes from another service/from the database
70 true,
71 TraceState::NONE,
72 )
73 }
74}
75
76/// A trait that represents a job which can be inserted into a queue
77pub trait InsertableJob: Serialize + Send {
78 /// The name of the queue this job belongs to
79 const QUEUE_NAME: &'static str;
80}
81
82/// A [`QueueJobRepository`] is used to schedule jobs to be executed by a
83/// worker.
84#[async_trait]
85pub trait QueueJobRepository: Send + Sync {
86 /// The error type returned by the repository.
87 type Error;
88
89 /// Schedule a job to be executed as soon as possible by a worker.
90 ///
91 /// # Parameters
92 ///
93 /// * `rng` - The random number generator used to generate a new job ID
94 /// * `clock` - The clock used to generate timestamps
95 /// * `queue_name` - The name of the queue to schedule the job on
96 /// * `payload` - The payload of the job
97 /// * `metadata` - Arbitrary metadata about the job scheduled immediately.
98 ///
99 /// # Errors
100 ///
101 /// Returns an error if the underlying repository fails.
102 async fn schedule(
103 &mut self,
104 rng: &mut (dyn RngCore + Send),
105 clock: &dyn Clock,
106 queue_name: &str,
107 payload: serde_json::Value,
108 metadata: serde_json::Value,
109 ) -> Result<(), Self::Error>;
110
111 /// Schedule a job to be executed at a later date by a worker.
112 ///
113 /// # Parameters
114 ///
115 /// * `rng` - The random number generator used to generate a new job ID
116 /// * `clock` - The clock used to generate timestamps
117 /// * `queue_name` - The name of the queue to schedule the job on
118 /// * `payload` - The payload of the job
119 /// * `metadata` - Arbitrary metadata about the job scheduled immediately.
120 /// * `scheduled_at` - The date and time to schedule the job for
121 /// * `schedule_name` - The name of the recurring schedule which scheduled
122 /// this job
123 ///
124 /// # Errors
125 ///
126 /// Returns an error if the underlying repository fails.
127 #[allow(clippy::too_many_arguments)]
128 async fn schedule_later(
129 &mut self,
130 rng: &mut (dyn RngCore + Send),
131 clock: &dyn Clock,
132 queue_name: &str,
133 payload: serde_json::Value,
134 metadata: serde_json::Value,
135 scheduled_at: DateTime<Utc>,
136 schedule_name: Option<&str>,
137 ) -> Result<(), Self::Error>;
138
139 /// Reserve multiple jobs from multiple queues
140 ///
141 /// # Parameters
142 ///
143 /// * `clock` - The clock used to generate timestamps
144 /// * `worker` - The worker that is reserving the jobs
145 /// * `queues` - The queues to reserve jobs from
146 /// * `count` - The number of jobs to reserve
147 ///
148 /// # Errors
149 ///
150 /// Returns an error if the underlying repository fails.
151 async fn reserve(
152 &mut self,
153 clock: &dyn Clock,
154 worker: &Worker,
155 queues: &[&str],
156 count: usize,
157 ) -> Result<Vec<Job>, Self::Error>;
158
159 /// Mark a job as completed
160 ///
161 /// # Parameters
162 ///
163 /// * `clock` - The clock used to generate timestamps
164 /// * `id` - The ID of the job to mark as completed
165 ///
166 /// # Errors
167 ///
168 /// Returns an error if the underlying repository fails.
169 async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
170
171 /// Marks a job as failed.
172 ///
173 /// # Parameters
174 ///
175 /// * `clock` - The clock used to generate timestamps
176 /// * `id` - The ID of the job to mark as failed
177 /// * `reason` - The reason for the failure
178 ///
179 /// # Errors
180 ///
181 /// Returns an error if the underlying repository fails.
182 async fn mark_as_failed(
183 &mut self,
184 clock: &dyn Clock,
185 id: Ulid,
186 reason: &str,
187 ) -> Result<(), Self::Error>;
188
189 /// Retry a job.
190 ///
191 /// # Parameters
192 ///
193 /// * `rng` - The random number generator used to generate a new job ID
194 /// * `clock` - The clock used to generate timestamps
195 /// * `id` - The ID of the job to reschedule
196 ///
197 /// # Errors
198 ///
199 /// Returns an error if the underlying repository fails.
200 async fn retry(
201 &mut self,
202 rng: &mut (dyn RngCore + Send),
203 clock: &dyn Clock,
204 id: Ulid,
205 delay: Duration,
206 ) -> Result<(), Self::Error>;
207
208 /// Mark all scheduled jobs past their scheduled date as available to be
209 /// executed.
210 ///
211 /// Returns the number of jobs that were marked as available.
212 ///
213 /// # Errors
214 ///
215 /// Returns an error if the underlying repository fails.
216 async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
217
218 /// Cleanup old completed and failed jobs
219 ///
220 /// This will delete jobs with status 'completed' or 'failed' and IDs up to
221 /// and including `until`. Uses ULID cursor-based pagination for efficiency.
222 ///
223 /// Returns the number of jobs deleted and the cursor for the next batch
224 ///
225 /// # Parameters
226 ///
227 /// * `since`: The cursor to start from (exclusive), or `None` to start from
228 /// the beginning
229 /// * `until`: The maximum ULID to delete (inclusive upper bound)
230 /// * `limit`: The maximum number of jobs to delete in this batch
231 ///
232 /// # Errors
233 ///
234 /// Returns [`Self::Error`] if the underlying repository fails
235 async fn cleanup(
236 &mut self,
237 since: Option<Ulid>,
238 until: Ulid,
239 limit: usize,
240 ) -> Result<(usize, Option<Ulid>), Self::Error>;
241}
242
243repository_impl!(QueueJobRepository:
244 async fn schedule(
245 &mut self,
246 rng: &mut (dyn RngCore + Send),
247 clock: &dyn Clock,
248 queue_name: &str,
249 payload: serde_json::Value,
250 metadata: serde_json::Value,
251 ) -> Result<(), Self::Error>;
252
253 async fn schedule_later(
254 &mut self,
255 rng: &mut (dyn RngCore + Send),
256 clock: &dyn Clock,
257 queue_name: &str,
258 payload: serde_json::Value,
259 metadata: serde_json::Value,
260 scheduled_at: DateTime<Utc>,
261 schedule_name: Option<&str>,
262 ) -> Result<(), Self::Error>;
263
264 async fn reserve(
265 &mut self,
266 clock: &dyn Clock,
267 worker: &Worker,
268 queues: &[&str],
269 count: usize,
270 ) -> Result<Vec<Job>, Self::Error>;
271
272 async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
273
274 async fn mark_as_failed(&mut self,
275 clock: &dyn Clock,
276 id: Ulid,
277 reason: &str,
278 ) -> Result<(), Self::Error>;
279
280 async fn retry(
281 &mut self,
282 rng: &mut (dyn RngCore + Send),
283 clock: &dyn Clock,
284 id: Ulid,
285 delay: Duration,
286 ) -> Result<(), Self::Error>;
287
288 async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
289
290 async fn cleanup(
291 &mut self,
292 since: Option<Ulid>,
293 until: Ulid,
294 limit: usize,
295 ) -> Result<(usize, Option<Ulid>), Self::Error>;
296);
297
298/// Extension trait for [`QueueJobRepository`] to help adding a job to the queue
299/// through the [`InsertableJob`] trait. This isn't in the
300/// [`QueueJobRepository`] trait to keep it object safe.
301#[async_trait]
302pub trait QueueJobRepositoryExt: QueueJobRepository {
303 /// Schedule a job to be executed as soon as possible by a worker.
304 ///
305 /// # Parameters
306 ///
307 /// * `rng` - The random number generator used to generate a new job ID
308 /// * `clock` - The clock used to generate timestamps
309 /// * `job` - The job to schedule
310 ///
311 /// # Errors
312 ///
313 /// Returns an error if the underlying repository fails.
314 async fn schedule_job<J: InsertableJob>(
315 &mut self,
316 rng: &mut (dyn RngCore + Send),
317 clock: &dyn Clock,
318 job: J,
319 ) -> Result<(), Self::Error>;
320
321 /// Schedule a job to be executed at a later date by a worker.
322 ///
323 /// # Parameters
324 ///
325 /// * `rng` - The random number generator used to generate a new job ID
326 /// * `clock` - The clock used to generate timestamps
327 /// * `job` - The job to schedule
328 /// * `scheduled_at` - The date and time to schedule the job for
329 ///
330 /// # Errors
331 ///
332 /// Returns an error if the underlying repository fails.
333 async fn schedule_job_later<J: InsertableJob>(
334 &mut self,
335 rng: &mut (dyn RngCore + Send),
336 clock: &dyn Clock,
337 job: J,
338 scheduled_at: DateTime<Utc>,
339 ) -> Result<(), Self::Error>;
340}
341
342#[async_trait]
343impl<T> QueueJobRepositoryExt for T
344where
345 T: QueueJobRepository,
346{
347 #[tracing::instrument(
348 name = "db.queue_job.schedule_job",
349 fields(
350 queue_job.queue_name = J::QUEUE_NAME,
351 ),
352 skip_all,
353 )]
354 async fn schedule_job<J: InsertableJob>(
355 &mut self,
356 rng: &mut (dyn RngCore + Send),
357 clock: &dyn Clock,
358 job: J,
359 ) -> Result<(), Self::Error> {
360 // Grab the span context from the current span
361 let span = tracing::Span::current();
362 let ctx = span.context();
363 let span = ctx.span();
364 let span_context = span.span_context();
365
366 let metadata = JobMetadata::new(span_context);
367 let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
368
369 let payload = serde_json::to_value(job).expect("Could not serialize job");
370 self.schedule(rng, clock, J::QUEUE_NAME, payload, metadata)
371 .await
372 }
373
374 #[tracing::instrument(
375 name = "db.queue_job.schedule_job_later",
376 fields(
377 queue_job.queue_name = J::QUEUE_NAME,
378 ),
379 skip_all,
380 )]
381 async fn schedule_job_later<J: InsertableJob>(
382 &mut self,
383 rng: &mut (dyn RngCore + Send),
384 clock: &dyn Clock,
385 job: J,
386 scheduled_at: DateTime<Utc>,
387 ) -> Result<(), Self::Error> {
388 // Grab the span context from the current span
389 let span = tracing::Span::current();
390 let ctx = span.context();
391 let span = ctx.span();
392 let span_context = span.span_context();
393
394 let metadata = JobMetadata::new(span_context);
395 let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
396
397 let payload = serde_json::to_value(job).expect("Could not serialize job");
398 self.schedule_later(
399 rng,
400 clock,
401 J::QUEUE_NAME,
402 payload,
403 metadata,
404 scheduled_at,
405 None,
406 )
407 .await
408 }
409}