syn2mas/mas_writer/
constraint_pausing.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.

use std::time::Instant;

use sqlx::PgConnection;
use tracing::{debug, info};

use super::{Error, IntoDatabase};

/// Description of a constraint, which allows recreating it later.
pub struct ConstraintDescription {
    pub name: String,
    pub table_name: String,
    pub definition: String,
}

pub struct IndexDescription {
    pub name: String,
    pub table_name: String,
    pub definition: String,
}

/// Look up and return the definition of a constraint.
pub async fn describe_constraints_on_table(
    conn: &mut PgConnection,
    table_name: &str,
) -> Result<Vec<ConstraintDescription>, Error> {
    sqlx::query_as!(
        ConstraintDescription,
        r#"
            SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!"
            FROM pg_constraint c
            JOIN pg_namespace n ON n.oid = c.connamespace
            WHERE contype IN ('f', 'p', 'u') AND conrelid::regclass::text = $1
            AND n.nspname = current_schema;
        "#,
        table_name
    ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read constraint definitions of {table_name}"))
}

/// Look up and return the definitions of foreign-key constraints whose
/// target table is the one specified.
pub async fn describe_foreign_key_constraints_to_table(
    conn: &mut PgConnection,
    target_table_name: &str,
) -> Result<Vec<ConstraintDescription>, Error> {
    sqlx::query_as!(
        ConstraintDescription,
        r#"
            SELECT conrelid::regclass::text AS "table_name!", conname AS "name!", pg_get_constraintdef(c.oid) AS "definition!"
            FROM pg_constraint c
            JOIN pg_namespace n ON n.oid = c.connamespace
            WHERE contype = 'f' AND confrelid::regclass::text = $1
            AND n.nspname = current_schema;
        "#,
        target_table_name
    ).fetch_all(&mut *conn).await.into_database_with(|| format!("could not read FK constraint definitions targetting {target_table_name}"))
}

/// Look up and return the definitions of all indices on a given table.
pub async fn describe_indices_on_table(
    conn: &mut PgConnection,
    table_name: &str,
) -> Result<Vec<IndexDescription>, Error> {
    sqlx::query_as!(
        IndexDescription,
        r#"
            SELECT indexname AS "name!", indexdef AS "definition!", schemaname AS "table_name!"
            FROM pg_indexes
            WHERE schemaname = current_schema AND tablename = $1 AND indexname IS NOT NULL AND indexdef IS NOT NULL
        "#,
        table_name
    ).fetch_all(&mut *conn).await.into_database("cannot search for indices")
}

/// Drops a constraint from the database.
///
/// The constraint must exist prior to this call.
pub async fn drop_constraint(
    conn: &mut PgConnection,
    constraint: &ConstraintDescription,
) -> Result<(), Error> {
    let name = &constraint.name;
    let table_name = &constraint.table_name;
    debug!("dropping constraint {name} on table {table_name}");
    sqlx::query(&format!("ALTER TABLE {table_name} DROP CONSTRAINT {name};"))
        .execute(&mut *conn)
        .await
        .into_database_with(|| format!("failed to drop constraint {name} on {table_name}"))?;

    Ok(())
}

/// Drops an index from the database.
///
/// The index must exist prior to this call.
pub async fn drop_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> {
    let index_name = &index.name;
    debug!("dropping index {index_name}");
    sqlx::query(&format!("DROP INDEX {index_name};"))
        .execute(&mut *conn)
        .await
        .into_database_with(|| format!("failed to temporarily drop {index_name}"))?;

    Ok(())
}

/// Restores (recreates) a constraint.
///
/// The constraint must not exist prior to this call.
#[tracing::instrument(name = "syn2mas.restore_constraint", skip_all, fields(constraint.name = constraint.name))]
pub async fn restore_constraint(
    conn: &mut PgConnection,
    constraint: &ConstraintDescription,
) -> Result<(), Error> {
    let start = Instant::now();

    let ConstraintDescription {
        name,
        table_name,
        definition,
    } = &constraint;
    info!("rebuilding constraint {name}");

    sqlx::query(&format!(
        "ALTER TABLE {table_name} ADD CONSTRAINT {name} {definition};"
    ))
    .execute(conn)
    .await
    .into_database_with(|| {
        format!("failed to recreate constraint {name} on {table_name} with {definition}")
    })?;

    info!(
        "constraint {name} rebuilt in {:.1}s",
        Instant::now().duration_since(start).as_secs_f64()
    );

    Ok(())
}

/// Restores (recreates) a index.
///
/// The index must not exist prior to this call.
#[tracing::instrument(name = "syn2mas.restore_index", skip_all, fields(index.name = index.name))]
pub async fn restore_index(conn: &mut PgConnection, index: &IndexDescription) -> Result<(), Error> {
    let start = Instant::now();

    let IndexDescription {
        name,
        table_name,
        definition,
    } = &index;

    sqlx::query(&format!("{definition};"))
        .execute(conn)
        .await
        .into_database_with(|| {
            format!("failed to recreate index {name} on {table_name} with {definition}")
        })?;

    info!(
        "index {name} rebuilt in {:.1}s",
        Instant::now().duration_since(start).as_secs_f64()
    );

    Ok(())
}