catcolab_backend/
main.rs

1use axum::{Router, routing::get};
2use firebase_auth::FirebaseAuth;
3use socketioxide::SocketIo;
4use sqlx::postgres::PgPoolOptions;
5use std::sync::Arc;
6use tower::ServiceBuilder;
7use tower_http::cors::CorsLayer;
8use tracing::{error, info};
9use tracing_subscriber::filter::{EnvFilter, LevelFilter};
10
11mod app;
12mod auth;
13mod document;
14mod rpc;
15mod socket;
16mod user;
17
18/// Port for the web server providing the RPC API.
19fn web_port() -> String {
20    dotenvy::var("PORT").unwrap_or("8000".to_string())
21}
22
23/** Port for internal communication with the Automerge doc server.
24
25This port should *not* be open to the public.
26*/
27fn automerge_io_port() -> String {
28    dotenvy::var("AUTOMERGE_IO_PORT").unwrap_or("3000".to_string())
29}
30
31#[tokio::main]
32async fn main() {
33    let env_filter = EnvFilter::builder()
34        .with_default_directive(LevelFilter::INFO.into())
35        .from_env_lossy();
36
37    tracing_subscriber::fmt().with_env_filter(env_filter).init();
38
39    let db = PgPoolOptions::new()
40        .max_connections(10)
41        .connect(&dotenvy::var("DATABASE_URL").expect("`DATABASE_URL` should be set"))
42        .await
43        .expect("Failed to connect to database");
44
45    let (io_layer, io) = SocketIo::new_layer();
46
47    let state = app::AppState {
48        automerge_io: io,
49        db,
50    };
51
52    // We need to wrap FirebaseAuth in an Arc because if it's ever dropped the process which updates it's
53    // jwt keys will be killed. The library is using the anti pattern of implementing both Clone and Drop on the
54    // same struct.
55    // https://github.com/trchopan/firebase-auth/issues/30
56    let firebase_auth =
57        Arc::new(FirebaseAuth::new(&dotenvy::var("FIREBASE_PROJECT_ID").unwrap()).await);
58
59    socket::setup_automerge_socket(state.clone());
60
61    let main_task = tokio::task::spawn(async {
62        let port = web_port();
63        let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port)).await.unwrap();
64
65        let router = rpc::router();
66        let (qubit_service, qubit_handle) = router.to_service(state);
67        let qubit_service = ServiceBuilder::new()
68            .map_request(move |mut req: hyper::Request<_>| {
69                match auth::authenticate_from_request(&firebase_auth, &req) {
70                    Ok(Some(user)) => {
71                        req.extensions_mut().insert(user);
72                    }
73                    Ok(None) => {}
74                    Err(err) => {
75                        error!("Authentication error: {}", err);
76                    }
77                };
78                req
79            })
80            .service(qubit_service);
81
82        let app = Router::new()
83            .route("/", get(|| async { "Hello! The CatColab server is running" }))
84            .nest_service("/rpc", qubit_service)
85            .layer(CorsLayer::permissive());
86        info!("Web server listening at port {}", port);
87        axum::serve(listener, app).await.unwrap();
88
89        qubit_handle.stop().unwrap();
90    });
91
92    let automerge_io_task = tokio::task::spawn(async {
93        let port = automerge_io_port();
94        let listener = tokio::net::TcpListener::bind(format!("localhost:{}", port)).await.unwrap();
95        let app = Router::new().layer(io_layer);
96        info!("Automerge socket listening at port {}", port);
97        axum::serve(listener, app).await.unwrap();
98    });
99
100    let (res_main, res_io) = tokio::join!(main_task, automerge_io_task);
101    res_main.unwrap();
102    res_io.unwrap();
103}