mas_storage/queue/
worker.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.

//! Repository to interact with workers in the job queue

use async_trait::async_trait;
use chrono::Duration;
use rand_core::RngCore;
use ulid::Ulid;

use crate::{repository_impl, Clock};

/// A worker is an entity which can execute jobs.
pub struct Worker {
    /// The ID of the worker.
    pub id: Ulid,
}

/// A [`QueueWorkerRepository`] is used to schedule jobs to be executed by a
/// worker.
#[async_trait]
pub trait QueueWorkerRepository: Send + Sync {
    /// The error type returned by the repository.
    type Error;

    /// Register a new worker.
    ///
    /// Returns a reference to the worker.
    ///
    /// # Errors
    ///
    /// Returns an error if the underlying repository fails.
    async fn register(
        &mut self,
        rng: &mut (dyn RngCore + Send),
        clock: &dyn Clock,
    ) -> Result<Worker, Self::Error>;

    /// Send a heartbeat for the given worker.
    ///
    /// # Errors
    ///
    /// Returns an error if the underlying repository fails or if the worker was
    /// shutdown.
    async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;

    /// Mark the given worker as shutdown.
    ///
    /// # Errors
    ///
    /// Returns an error if the underlying repository fails.
    async fn shutdown(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;

    /// Find dead workers and shut them down.
    ///
    /// # Errors
    ///
    /// Returns an error if the underlying repository fails.
    async fn shutdown_dead_workers(
        &mut self,
        clock: &dyn Clock,
        threshold: Duration,
    ) -> Result<(), Self::Error>;

    /// Remove the leader lease if it is expired, sending a notification to
    /// trigger a new leader election.
    ///
    /// # Errors
    ///
    /// Returns an error if the underlying repository fails.
    async fn remove_leader_lease_if_expired(
        &mut self,
        clock: &dyn Clock,
    ) -> Result<(), Self::Error>;

    /// Try to get the leader lease, renewing it if we already have it
    ///
    /// Returns `true` if we got the leader lease, `false` if we didn't
    ///
    /// # Errors
    ///
    /// Returns an error if the underlying repository fails.
    async fn try_get_leader_lease(
        &mut self,
        clock: &dyn Clock,
        worker: &Worker,
    ) -> Result<bool, Self::Error>;
}

repository_impl!(QueueWorkerRepository:
    async fn register(
        &mut self,
        rng: &mut (dyn RngCore + Send),
        clock: &dyn Clock,
    ) -> Result<Worker, Self::Error>;

    async fn heartbeat(
        &mut self,
        clock: &dyn Clock,
        worker: &Worker,
    ) -> Result<(), Self::Error>;

    async fn shutdown(
        &mut self,
        clock: &dyn Clock,
        worker: &Worker,
    ) -> Result<(), Self::Error>;

    async fn shutdown_dead_workers(
        &mut self,
        clock: &dyn Clock,
        threshold: Duration,
    ) -> Result<(), Self::Error>;

    async fn remove_leader_lease_if_expired(
        &mut self,
        clock: &dyn Clock,
    ) -> Result<(), Self::Error>;

    async fn try_get_leader_lease(
        &mut self,
        clock: &dyn Clock,
        worker: &Worker,
    ) -> Result<bool, Self::Error>;
);