backend/
main.rs

1use axum::extract::Request;
2
3use axum::middleware::{Next, from_fn_with_state};
4use axum::{Router, routing::get};
5use axum::{extract::State, response::IntoResponse};
6use clap::{Parser, Subcommand};
7use firebase_auth::FirebaseAuth;
8use http::StatusCode;
9use socketioxide::SocketIo;
10use socketioxide::layer::SocketIoLayer;
11use sqlx::postgres::PgPoolOptions;
12use sqlx::{PgPool, Postgres};
13use sqlx_migrator::cli::MigrationCommand;
14use sqlx_migrator::migrator::{Migrate, Migrator};
15use sqlx_migrator::{Info, Plan};
16use std::path::Path;
17use std::sync::Arc;
18use tokio::sync::watch;
19use tower::ServiceBuilder;
20use tower_http::cors::CorsLayer;
21use tracing::{error, info};
22use tracing_subscriber::filter::{EnvFilter, LevelFilter};
23
24mod app;
25mod auth;
26mod document;
27mod migrations;
28mod rpc;
29mod socket;
30mod user;
31
32use app::AppStatus;
33
34/// Port for the web server providing the RPC API.
35fn web_port() -> String {
36    dotenvy::var("PORT").unwrap_or("8000".to_string())
37}
38
39/// Port for internal communication with the Automerge doc server.
40///
41/// This port should *not* be open to the public.
42fn automerge_io_port() -> String {
43    dotenvy::var("AUTOMERGE_IO_PORT").unwrap_or("3000".to_string())
44}
45
46#[derive(Parser, Debug)]
47#[command(name = "catcolab")]
48#[command(about = "CatColab server and migration CLI", version)]
49struct Cli {
50    #[command(subcommand)]
51    command: Option<Command>,
52}
53
54#[derive(Subcommand, Debug)]
55enum Command {
56    /// Run database migrations (proxied to sqlx_migrator)
57    Migrator(MigrationCommand),
58    /// Start the web server (default)
59    Serve,
60}
61
62#[tokio::main]
63async fn main() {
64    let env_filter = EnvFilter::builder()
65        .with_default_directive(LevelFilter::INFO.into())
66        .from_env_lossy();
67
68    tracing_subscriber::fmt().with_env_filter(env_filter).init();
69
70    let db = PgPoolOptions::new()
71        .max_connections(10)
72        .connect(&dotenvy::var("DATABASE_URL").expect("`DATABASE_URL` should be set"))
73        .await
74        .expect("Failed to connect to database");
75
76    let cli = Cli::parse();
77
78    let mut migrator = Migrator::default();
79    migrator
80        .add_migrations(migrations::migrations())
81        .expect("Failed to load migrations");
82
83    match cli.command.unwrap_or(Command::Serve) {
84        Command::Migrator(cmd) => {
85            let mut conn = db.acquire().await.expect("Failed to acquire DB connection");
86
87            cmd.run(&mut *conn, Box::new(migrator)).await.unwrap();
88            return;
89        }
90
91        Command::Serve => {
92            let (io_layer, io) = SocketIo::new_layer();
93
94            let (status_tx, status_rx) = watch::channel(AppStatus::Starting);
95            let state = app::AppState {
96                automerge_io: io,
97                db: db.clone(),
98                app_status: status_rx.clone(),
99            };
100
101            // We need to wrap FirebaseAuth in an Arc because if it's ever dropped the process which updates it's
102            // jwt keys will be killed. The library is using the anti pattern of implementing both Clone and Drop on the
103            // same struct.
104            // https://github.com/trchopan/firebase-auth/issues/30
105            let firebase_auth = Arc::new(
106                FirebaseAuth::new(
107                    &dotenvy::var("FIREBASE_PROJECT_ID")
108                        .expect("`FIREBASE_PROJECT_ID` should be set"),
109                )
110                .await,
111            );
112
113            socket::setup_automerge_socket(state.clone());
114
115            tokio::try_join!(
116                run_migrator_apply(db.clone(), migrator, status_tx.clone()),
117                run_web_server(state.clone(), firebase_auth.clone()),
118                run_automerge_socket(io_layer),
119            )
120            .unwrap();
121        }
122    }
123}
124
125async fn run_migrator_apply(
126    db: PgPool,
127    migrator: Migrator<Postgres>,
128    status_tx: watch::Sender<AppStatus>,
129) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
130    status_tx.send(AppStatus::Migrating)?;
131    info!("Applying database migrations...");
132
133    let mut conn = db.acquire().await?;
134    migrator.run(&mut conn, &Plan::apply_all()).await.unwrap();
135
136    status_tx.send(AppStatus::Running)?;
137    sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?;
138    info!("Migrations complete");
139
140    Ok(())
141}
142
143async fn app_status_gate(
144    State(status_rx): State<watch::Receiver<AppStatus>>,
145    req: Request,
146    next: Next,
147) -> impl IntoResponse {
148    // Combining the following 2 lines will anger the rust gods
149    let status = status_rx.borrow().clone();
150    match status {
151        AppStatus::Running => next.run(req).await,
152        AppStatus::Failed(reason) => {
153            (StatusCode::INTERNAL_SERVER_ERROR, format!("App failed to start: {reason}"))
154                .into_response()
155        }
156        AppStatus::Starting | AppStatus::Migrating => {
157            (StatusCode::SERVICE_UNAVAILABLE, "Server not ready yet").into_response()
158        }
159    }
160}
161
162async fn auth_middleware(
163    State(firebase_auth): State<Arc<FirebaseAuth>>,
164    mut req: Request<axum::body::Body>,
165    next: axum::middleware::Next,
166) -> impl IntoResponse {
167    match auth::authenticate_from_request(&firebase_auth, &req) {
168        Ok(Some(user)) => {
169            req.extensions_mut().insert(user);
170        }
171        Ok(_) => {}
172        Err(err) => {
173            error!("Authentication error: {err}");
174        }
175    }
176
177    next.run(req).await
178}
179
180async fn status_handler(State(status_rx): State<watch::Receiver<AppStatus>>) -> String {
181    match status_rx.borrow().clone() {
182        AppStatus::Starting => "Starting".into(),
183        AppStatus::Migrating => "Migrating".into(),
184        AppStatus::Running => "Running".into(),
185        AppStatus::Failed(reason) => format!("Failed: {reason}"),
186    }
187}
188
189use axum::routing::get_service;
190use tower_http::services::{ServeDir, ServeFile};
191
192async fn run_web_server(
193    state: app::AppState,
194    firebase_auth: Arc<FirebaseAuth>,
195) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
196    let port = web_port();
197    let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{port}")).await?;
198
199    let rpc_router = rpc::router();
200    let (qubit_service, qubit_handle) = rpc_router.to_service(state.clone());
201
202    let rpc_with_mw = ServiceBuilder::new()
203        .layer(from_fn_with_state(state.app_status.clone(), app_status_gate))
204        .layer(from_fn_with_state(firebase_auth, auth_middleware))
205        .service(qubit_service);
206
207    // NOTE: Currently nothing is using the /status endpoint. It will likely be used in the future by
208    // tests.
209    let status_router = Router::new()
210        .route("/status", get(status_handler))
211        .with_state(state.app_status.clone());
212
213    let mut app = Router::new().merge(status_router).nest_service("/rpc", rpc_with_mw);
214
215    if let Some(spa_dir) = spa_directory() {
216        let index = Path::new(&spa_dir).join("index.html");
217        let spa_service =
218            get_service(ServeDir::new(&spa_dir).not_found_service(ServeFile::new(index)));
219
220        info!("Serving frontend from directory: {spa_dir}");
221        app = app.nest_service("/", spa_service);
222    } else {
223        info!("frontend directory not found; keeping default text route at /");
224        app = app.route("/", get(|| async { "Hello! The CatColab server is running" }));
225    }
226
227    app = app.layer(CorsLayer::permissive());
228
229    info!("Web server listening at port {port}");
230
231    axum::serve(listener, app).await?;
232    qubit_handle.stop().ok();
233
234    Ok(())
235}
236
237fn spa_directory() -> Option<String> {
238    // NOTE: using an environment variable allows us to set the frontend at runtime, which will prevent
239    // possible issues with circular dependencies in the future. (Currently the frontend dependency
240    // catcolab-api, which is built by the backend, is tracked in git)
241    if let Ok(candidate) = dotenvy::var("SPA_DIR") {
242        let path = Path::new(&candidate);
243        if path.exists() && path.is_dir() {
244            return Some(candidate);
245        }
246    }
247    None
248}
249
250async fn run_automerge_socket(
251    io_layer: SocketIoLayer,
252) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
253    let port = automerge_io_port();
254    let listener = tokio::net::TcpListener::bind(format!("localhost:{port}")).await?;
255    let app = Router::new().layer(io_layer);
256    info!("Automerge socket listening at port {port}");
257    axum::serve(listener, app).await?;
258    Ok(())
259}