mas_storage/queue/
job.rs

1// Copyright 2025, 2026 Element Creations Ltd.
2// Copyright 2024, 2025 New Vector Ltd.
3//
4// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
5// Please see LICENSE files in the repository root for full details.
6
7//! Repository to interact with jobs in the job queue
8
9use async_trait::async_trait;
10use chrono::{DateTime, Duration, Utc};
11use mas_data_model::Clock;
12use opentelemetry::trace::TraceContextExt;
13use rand_core::RngCore;
14use serde::{Deserialize, Serialize};
15use tracing_opentelemetry::OpenTelemetrySpanExt;
16use ulid::Ulid;
17
18use super::Worker;
19use crate::repository_impl;
20
21/// Represents a job in the job queue
22pub struct Job {
23    /// The ID of the job
24    pub id: Ulid,
25
26    /// The queue on which the job was placed
27    pub queue_name: String,
28
29    /// The payload of the job
30    pub payload: serde_json::Value,
31
32    /// Arbitrary metadata about the job
33    pub metadata: JobMetadata,
34
35    /// Which attempt it is
36    pub attempt: usize,
37}
38
39/// Metadata stored alongside the job
40#[derive(Serialize, Deserialize, Default, Clone, Debug)]
41pub struct JobMetadata {
42    #[serde(default)]
43    trace_id: String,
44
45    #[serde(default)]
46    span_id: String,
47
48    #[serde(default)]
49    trace_flags: u8,
50}
51
52impl JobMetadata {
53    fn new(span_context: &opentelemetry::trace::SpanContext) -> Self {
54        Self {
55            trace_id: span_context.trace_id().to_string(),
56            span_id: span_context.span_id().to_string(),
57            trace_flags: span_context.trace_flags().to_u8(),
58        }
59    }
60
61    /// Get the [`opentelemetry::trace::SpanContext`] from this [`JobMetadata`]
62    #[must_use]
63    pub fn span_context(&self) -> opentelemetry::trace::SpanContext {
64        use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState};
65        SpanContext::new(
66            TraceId::from_hex(&self.trace_id).unwrap_or(TraceId::INVALID),
67            SpanId::from_hex(&self.span_id).unwrap_or(SpanId::INVALID),
68            TraceFlags::new(self.trace_flags),
69            // Trace context is remote, as it comes from another service/from the database
70            true,
71            TraceState::NONE,
72        )
73    }
74}
75
76/// A trait that represents a job which can be inserted into a queue
77pub trait InsertableJob: Serialize + Send {
78    /// The name of the queue this job belongs to
79    const QUEUE_NAME: &'static str;
80}
81
82/// A [`QueueJobRepository`] is used to schedule jobs to be executed by a
83/// worker.
84#[async_trait]
85pub trait QueueJobRepository: Send + Sync {
86    /// The error type returned by the repository.
87    type Error;
88
89    /// Schedule a job to be executed as soon as possible by a worker.
90    ///
91    /// # Parameters
92    ///
93    /// * `rng` - The random number generator used to generate a new job ID
94    /// * `clock` - The clock used to generate timestamps
95    /// * `queue_name` - The name of the queue to schedule the job on
96    /// * `payload` - The payload of the job
97    /// * `metadata` - Arbitrary metadata about the job scheduled immediately.
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if the underlying repository fails.
102    async fn schedule(
103        &mut self,
104        rng: &mut (dyn RngCore + Send),
105        clock: &dyn Clock,
106        queue_name: &str,
107        payload: serde_json::Value,
108        metadata: serde_json::Value,
109    ) -> Result<(), Self::Error>;
110
111    /// Schedule a job to be executed at a later date by a worker.
112    ///
113    /// # Parameters
114    ///
115    /// * `rng` - The random number generator used to generate a new job ID
116    /// * `clock` - The clock used to generate timestamps
117    /// * `queue_name` - The name of the queue to schedule the job on
118    /// * `payload` - The payload of the job
119    /// * `metadata` - Arbitrary metadata about the job scheduled immediately.
120    /// * `scheduled_at` - The date and time to schedule the job for
121    /// * `schedule_name` - The name of the recurring schedule which scheduled
122    ///   this job
123    ///
124    /// # Errors
125    ///
126    /// Returns an error if the underlying repository fails.
127    #[allow(clippy::too_many_arguments)]
128    async fn schedule_later(
129        &mut self,
130        rng: &mut (dyn RngCore + Send),
131        clock: &dyn Clock,
132        queue_name: &str,
133        payload: serde_json::Value,
134        metadata: serde_json::Value,
135        scheduled_at: DateTime<Utc>,
136        schedule_name: Option<&str>,
137    ) -> Result<(), Self::Error>;
138
139    /// Reserve multiple jobs from multiple queues
140    ///
141    /// # Parameters
142    ///
143    /// * `clock` - The clock used to generate timestamps
144    /// * `worker` - The worker that is reserving the jobs
145    /// * `queues` - The queues to reserve jobs from
146    /// * `count` - The number of jobs to reserve
147    ///
148    /// # Errors
149    ///
150    /// Returns an error if the underlying repository fails.
151    async fn reserve(
152        &mut self,
153        clock: &dyn Clock,
154        worker: &Worker,
155        queues: &[&str],
156        count: usize,
157    ) -> Result<Vec<Job>, Self::Error>;
158
159    /// Mark a job as completed
160    ///
161    /// # Parameters
162    ///
163    /// * `clock` - The clock used to generate timestamps
164    /// * `id` - The ID of the job to mark as completed
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if the underlying repository fails.
169    async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
170
171    /// Marks a job as failed.
172    ///
173    /// # Parameters
174    ///
175    /// * `clock` - The clock used to generate timestamps
176    /// * `id` - The ID of the job to mark as failed
177    /// * `reason` - The reason for the failure
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if the underlying repository fails.
182    async fn mark_as_failed(
183        &mut self,
184        clock: &dyn Clock,
185        id: Ulid,
186        reason: &str,
187    ) -> Result<(), Self::Error>;
188
189    /// Retry a job.
190    ///
191    /// # Parameters
192    ///
193    /// * `rng` - The random number generator used to generate a new job ID
194    /// * `clock` - The clock used to generate timestamps
195    /// * `id` - The ID of the job to reschedule
196    ///
197    /// # Errors
198    ///
199    /// Returns an error if the underlying repository fails.
200    async fn retry(
201        &mut self,
202        rng: &mut (dyn RngCore + Send),
203        clock: &dyn Clock,
204        id: Ulid,
205        delay: Duration,
206    ) -> Result<(), Self::Error>;
207
208    /// Mark all scheduled jobs past their scheduled date as available to be
209    /// executed.
210    ///
211    /// Returns the number of jobs that were marked as available.
212    ///
213    /// # Errors
214    ///
215    /// Returns an error if the underlying repository fails.
216    async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
217
218    /// Cleanup old completed and failed jobs
219    ///
220    /// This will delete jobs with status 'completed' or 'failed' and IDs up to
221    /// and including `until`. Uses ULID cursor-based pagination for efficiency.
222    ///
223    /// Returns the number of jobs deleted and the cursor for the next batch
224    ///
225    /// # Parameters
226    ///
227    /// * `since`: The cursor to start from (exclusive), or `None` to start from
228    ///   the beginning
229    /// * `until`: The maximum ULID to delete (inclusive upper bound)
230    /// * `limit`: The maximum number of jobs to delete in this batch
231    ///
232    /// # Errors
233    ///
234    /// Returns [`Self::Error`] if the underlying repository fails
235    async fn cleanup(
236        &mut self,
237        since: Option<Ulid>,
238        until: Ulid,
239        limit: usize,
240    ) -> Result<(usize, Option<Ulid>), Self::Error>;
241}
242
243repository_impl!(QueueJobRepository:
244    async fn schedule(
245        &mut self,
246        rng: &mut (dyn RngCore + Send),
247        clock: &dyn Clock,
248        queue_name: &str,
249        payload: serde_json::Value,
250        metadata: serde_json::Value,
251    ) -> Result<(), Self::Error>;
252
253    async fn schedule_later(
254        &mut self,
255        rng: &mut (dyn RngCore + Send),
256        clock: &dyn Clock,
257        queue_name: &str,
258        payload: serde_json::Value,
259        metadata: serde_json::Value,
260        scheduled_at: DateTime<Utc>,
261        schedule_name: Option<&str>,
262    ) -> Result<(), Self::Error>;
263
264    async fn reserve(
265        &mut self,
266        clock: &dyn Clock,
267        worker: &Worker,
268        queues: &[&str],
269        count: usize,
270    ) -> Result<Vec<Job>, Self::Error>;
271
272    async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
273
274    async fn mark_as_failed(&mut self,
275        clock: &dyn Clock,
276        id: Ulid,
277        reason: &str,
278    ) -> Result<(), Self::Error>;
279
280    async fn retry(
281        &mut self,
282        rng: &mut (dyn RngCore + Send),
283        clock: &dyn Clock,
284        id: Ulid,
285        delay: Duration,
286    ) -> Result<(), Self::Error>;
287
288    async fn schedule_available_jobs(&mut self, clock: &dyn Clock) -> Result<usize, Self::Error>;
289
290    async fn cleanup(
291        &mut self,
292        since: Option<Ulid>,
293        until: Ulid,
294        limit: usize,
295    ) -> Result<(usize, Option<Ulid>), Self::Error>;
296);
297
298/// Extension trait for [`QueueJobRepository`] to help adding a job to the queue
299/// through the [`InsertableJob`] trait. This isn't in the
300/// [`QueueJobRepository`] trait to keep it object safe.
301#[async_trait]
302pub trait QueueJobRepositoryExt: QueueJobRepository {
303    /// Schedule a job to be executed as soon as possible by a worker.
304    ///
305    /// # Parameters
306    ///
307    /// * `rng` - The random number generator used to generate a new job ID
308    /// * `clock` - The clock used to generate timestamps
309    /// * `job` - The job to schedule
310    ///
311    /// # Errors
312    ///
313    /// Returns an error if the underlying repository fails.
314    async fn schedule_job<J: InsertableJob>(
315        &mut self,
316        rng: &mut (dyn RngCore + Send),
317        clock: &dyn Clock,
318        job: J,
319    ) -> Result<(), Self::Error>;
320
321    /// Schedule a job to be executed at a later date by a worker.
322    ///
323    /// # Parameters
324    ///
325    /// * `rng` - The random number generator used to generate a new job ID
326    /// * `clock` - The clock used to generate timestamps
327    /// * `job` - The job to schedule
328    /// * `scheduled_at` - The date and time to schedule the job for
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if the underlying repository fails.
333    async fn schedule_job_later<J: InsertableJob>(
334        &mut self,
335        rng: &mut (dyn RngCore + Send),
336        clock: &dyn Clock,
337        job: J,
338        scheduled_at: DateTime<Utc>,
339    ) -> Result<(), Self::Error>;
340}
341
342#[async_trait]
343impl<T> QueueJobRepositoryExt for T
344where
345    T: QueueJobRepository,
346{
347    #[tracing::instrument(
348        name = "db.queue_job.schedule_job",
349        fields(
350            queue_job.queue_name = J::QUEUE_NAME,
351        ),
352        skip_all,
353    )]
354    async fn schedule_job<J: InsertableJob>(
355        &mut self,
356        rng: &mut (dyn RngCore + Send),
357        clock: &dyn Clock,
358        job: J,
359    ) -> Result<(), Self::Error> {
360        // Grab the span context from the current span
361        let span = tracing::Span::current();
362        let ctx = span.context();
363        let span = ctx.span();
364        let span_context = span.span_context();
365
366        let metadata = JobMetadata::new(span_context);
367        let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
368
369        let payload = serde_json::to_value(job).expect("Could not serialize job");
370        self.schedule(rng, clock, J::QUEUE_NAME, payload, metadata)
371            .await
372    }
373
374    #[tracing::instrument(
375        name = "db.queue_job.schedule_job_later",
376        fields(
377            queue_job.queue_name = J::QUEUE_NAME,
378        ),
379        skip_all,
380    )]
381    async fn schedule_job_later<J: InsertableJob>(
382        &mut self,
383        rng: &mut (dyn RngCore + Send),
384        clock: &dyn Clock,
385        job: J,
386        scheduled_at: DateTime<Utc>,
387    ) -> Result<(), Self::Error> {
388        // Grab the span context from the current span
389        let span = tracing::Span::current();
390        let ctx = span.context();
391        let span = ctx.span();
392        let span_context = span.span_context();
393
394        let metadata = JobMetadata::new(span_context);
395        let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata");
396
397        let payload = serde_json::to_value(job).expect("Could not serialize job");
398        self.schedule_later(
399            rng,
400            clock,
401            J::QUEUE_NAME,
402            payload,
403            metadata,
404            scheduled_at,
405            None,
406        )
407        .await
408    }
409}