mas_storage/queue/worker.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.
//! Repository to interact with workers in the job queue
use async_trait::async_trait;
use chrono::Duration;
use rand_core::RngCore;
use ulid::Ulid;
use crate::{repository_impl, Clock};
/// A worker is an entity which can execute jobs.
pub struct Worker {
/// The ID of the worker.
pub id: Ulid,
}
/// A [`QueueWorkerRepository`] is used to schedule jobs to be executed by a
/// worker.
#[async_trait]
pub trait QueueWorkerRepository: Send + Sync {
/// The error type returned by the repository.
type Error;
/// Register a new worker.
///
/// Returns a reference to the worker.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn register(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
) -> Result<Worker, Self::Error>;
/// Send a heartbeat for the given worker.
///
/// # Errors
///
/// Returns an error if the underlying repository fails or if the worker was
/// shutdown.
async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
/// Mark the given worker as shutdown.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn shutdown(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
/// Find dead workers and shut them down.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn shutdown_dead_workers(
&mut self,
clock: &dyn Clock,
threshold: Duration,
) -> Result<(), Self::Error>;
/// Remove the leader lease if it is expired, sending a notification to
/// trigger a new leader election.
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn remove_leader_lease_if_expired(
&mut self,
clock: &dyn Clock,
) -> Result<(), Self::Error>;
/// Try to get the leader lease, renewing it if we already have it
///
/// Returns `true` if we got the leader lease, `false` if we didn't
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn try_get_leader_lease(
&mut self,
clock: &dyn Clock,
worker: &Worker,
) -> Result<bool, Self::Error>;
}
repository_impl!(QueueWorkerRepository:
async fn register(
&mut self,
rng: &mut (dyn RngCore + Send),
clock: &dyn Clock,
) -> Result<Worker, Self::Error>;
async fn heartbeat(
&mut self,
clock: &dyn Clock,
worker: &Worker,
) -> Result<(), Self::Error>;
async fn shutdown(
&mut self,
clock: &dyn Clock,
worker: &Worker,
) -> Result<(), Self::Error>;
async fn shutdown_dead_workers(
&mut self,
clock: &dyn Clock,
threshold: Duration,
) -> Result<(), Self::Error>;
async fn remove_leader_lease_if_expired(
&mut self,
clock: &dyn Clock,
) -> Result<(), Self::Error>;
async fn try_get_leader_lease(
&mut self,
clock: &dyn Clock,
worker: &Worker,
) -> Result<bool, Self::Error>;
);