diff --git a/Cargo.lock b/Cargo.lock index 8245cfff0d..253653b0eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2631,6 +2631,29 @@ dependencies = [ "serde", ] +[[package]] +name = "elasticsearch" +version = "9.1.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12bb303aa6e1d28c0c86b6fbfe484fd0fd3f512629aeed1ac4f6b85f81d9834a" +dependencies = [ + "base64 0.22.1", + "bytes", + "dyn-clone", + "flate2", + "lazy_static", + "parking_lot", + "percent-encoding", + "reqwest", + "rustc_version", + "serde", + "serde_json", + "serde_with", + "tokio", + "url", + "void", +] + [[package]] name = "elliptic-curve" version = "0.13.8" @@ -4748,6 +4771,7 @@ dependencies = [ "dotenv-build", "dotenvy", "either", + "elasticsearch", "eyre", "futures", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index d5d9ba9131..68ff994185 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ dotenv-build = "0.1.1" dotenvy = "0.15.7" dunce = "1.0.5" either = "1.15.0" +elasticsearch = "9.1.0-alpha.1" encoding_rs = "0.8.35" enumset = "1.1.10" eyre = "0.6.12" diff --git a/apps/labrinth/.env.docker-compose b/apps/labrinth/.env.docker-compose index 5198de5ad7..cc7ce6fd18 100644 --- a/apps/labrinth/.env.docker-compose +++ b/apps/labrinth/.env.docker-compose @@ -16,9 +16,14 @@ DATABASE_URL=postgresql://labrinth:labrinth@labrinth-postgres/labrinth DATABASE_MIN_CONNECTIONS=0 DATABASE_MAX_CONNECTIONS=16 +SEARCH_BACKEND=meilisearch MEILISEARCH_READ_ADDR=http://localhost:7700 MEILISEARCH_WRITE_ADDRS=http://localhost:7700 MEILISEARCH_KEY=modrinth +ELASTICSEARCH_URL=http://localhost:9200 +ELASTICSEARCH_INDEX_PREFIX=labrinth +ELASTICSEARCH_USERNAME=elastic +ELASTICSEARCH_PASSWORD=elastic REDIS_URL=redis://labrinth-redis REDIS_MIN_CONNECTIONS=0 diff --git a/apps/labrinth/.env.local b/apps/labrinth/.env.local index 0040a36ba5..a0babd129b 100644 --- a/apps/labrinth/.env.local +++ b/apps/labrinth/.env.local @@ -16,16 +16,29 @@ DATABASE_URL=postgresql://labrinth:labrinth@localhost/labrinth DATABASE_MIN_CONNECTIONS=0 DATABASE_MAX_CONNECTIONS=16 +SEARCH_BACKEND=meilisearch + +# Meilisearch configuration MEILISEARCH_READ_ADDR=http://localhost:7700 MEILISEARCH_WRITE_ADDRS=http://localhost:7700 # 5 minutes in milliseconds SEARCH_OPERATION_TIMEOUT=300000 +ELASTICSEARCH_URL=http://localhost:9200 +ELASTICSEARCH_INDEX_PREFIX=labrinth + # # For a sharded Meilisearch setup (sharded-meilisearch docker compose profile) # MEILISEARCH_READ_ADDR=http://localhost:7710 # MEILISEARCH_WRITE_ADDRS=http://localhost:7700,http://localhost:7701 MEILISEARCH_KEY=modrinth +MEILISEARCH_META_NAMESPACE= + +# Elasticsearch configuration +ELASTICSEARCH_URL=http://localhost:9200 +ELASTICSEARCH_INDEX_PREFIX=labrinth +ELASTICSEARCH_USERNAME= +ELASTICSEARCH_PASSWORD= REDIS_URL=redis://localhost REDIS_MIN_CONNECTIONS=0 diff --git a/apps/labrinth/Cargo.toml b/apps/labrinth/Cargo.toml index 76cbff499e..46fb849297 100644 --- a/apps/labrinth/Cargo.toml +++ b/apps/labrinth/Cargo.toml @@ -25,7 +25,7 @@ async-stripe = { workspace = true, features = [ "billing", "checkout", "connect", - "webhook-events", + "webhook-events" ] } async-trait = { workspace = true } base64 = { workspace = true } @@ -43,6 +43,7 @@ deadpool-redis.workspace = true derive_more = { workspace = true, features = ["deref", "deref_mut"] } dotenvy = { workspace = true } either = { workspace = true } +elasticsearch = { workspace = true, features = ["experimental-apis"] } eyre = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } @@ -85,11 +86,11 @@ reqwest = { workspace = true, features = [ "http2", "json", "multipart", - "rustls-tls-webpki-roots", + "rustls-tls-webpki-roots" ] } rust_decimal = { workspace = true, features = [ "serde-with-float", - "serde-with-str", + "serde-with-str" ] } rust_iso3166 = { workspace = true } rust-s3 = { workspace = true } diff --git a/apps/labrinth/src/auth/mod.rs b/apps/labrinth/src/auth/mod.rs index f34f7ddb1f..d996afb1d5 100644 --- a/apps/labrinth/src/auth/mod.rs +++ b/apps/labrinth/src/auth/mod.rs @@ -20,8 +20,6 @@ use thiserror::Error; pub enum AuthenticationError { #[error(transparent)] Internal(#[from] eyre::Report), - #[error("Environment Error")] - Env(#[from] dotenvy::Error), #[error("An unknown database error occurred: {0}")] Sqlx(#[from] sqlx::Error), #[error("Database Error: {0}")] @@ -58,7 +56,6 @@ impl actix_web::ResponseError for AuthenticationError { AuthenticationError::Internal(..) => { StatusCode::INTERNAL_SERVER_ERROR } - AuthenticationError::Env(..) => StatusCode::INTERNAL_SERVER_ERROR, AuthenticationError::Sqlx(..) => StatusCode::INTERNAL_SERVER_ERROR, AuthenticationError::Database(..) => { StatusCode::INTERNAL_SERVER_ERROR @@ -94,7 +91,6 @@ impl AuthenticationError { pub fn error_name(&self) -> &'static str { match self { AuthenticationError::Internal(..) => "internal_error", - AuthenticationError::Env(..) => "environment_error", AuthenticationError::Sqlx(..) => "database_error", AuthenticationError::Database(..) => "database_error", AuthenticationError::SerDe(..) => "invalid_input", diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs index 1783e36e26..7ba6c1f3a8 100644 --- a/apps/labrinth/src/background_task.rs +++ b/apps/labrinth/src/background_task.rs @@ -1,3 +1,4 @@ +use crate::database; use crate::database::PgPool; use crate::database::redis::RedisPool; use crate::queue::billing::{index_billing, index_subscriptions}; @@ -7,9 +8,9 @@ use crate::queue::payouts::{ insert_bank_balances_and_webhook, process_affiliate_payouts, process_payout, remove_payouts_for_refunded_charges, }; -use crate::search::indexing::index_projects; +use crate::search::SearchBackend; use crate::util::anrok; -use crate::{database, search}; +use actix_web::web; use clap::ValueEnum; use tracing::{error, info, warn}; @@ -34,18 +35,19 @@ impl BackgroundTask { pool: PgPool, ro_pool: PgPool, redis_pool: RedisPool, - search_config: search::SearchConfig, + search_backend: web::Data, clickhouse: clickhouse::Client, stripe_client: stripe::Client, anrok_client: anrok::Client, email_queue: EmailQueue, mural_client: muralpay::Client, - ) { + ) -> eyre::Result<()> { use BackgroundTask::*; + // TODO: all of these tasks should return `eyre::Result`s match self { Migrations => run_migrations().await, IndexSearch => { - index_search(ro_pool, redis_pool, search_config).await + return index_search(ro_pool, redis_pool, search_backend).await; } ReleaseScheduled => release_scheduled(pool).await, UpdateVersions => update_versions(pool, redis_pool).await, @@ -77,6 +79,7 @@ impl BackgroundTask { run_email(email_queue).await; } } + Ok(()) } } @@ -122,14 +125,10 @@ pub async fn run_migrations() { pub async fn index_search( ro_pool: PgPool, redis_pool: RedisPool, - search_config: search::SearchConfig, -) { + search_backend: web::Data, +) -> eyre::Result<()> { info!("Indexing local database"); - let result = index_projects(ro_pool, redis_pool, &search_config).await; - if let Err(e) = result { - warn!("Local project indexing failed: {:?}", e); - } - info!("Done indexing local database"); + search_backend.index_projects(ro_pool, redis_pool).await } pub async fn release_scheduled(pool: PgPool) { diff --git a/apps/labrinth/src/env.rs b/apps/labrinth/src/env.rs index a78b167799..3c8244e252 100644 --- a/apps/labrinth/src/env.rs +++ b/apps/labrinth/src/env.rs @@ -82,6 +82,7 @@ where } pub fn init() -> eyre::Result<()> { + dotenvy::dotenv().ok(); EnvVars::from_env()?; LazyLock::force(&ENV); Ok(()) @@ -128,9 +129,6 @@ vars! { LABRINTH_EXTERNAL_NOTIFICATION_KEY: String; RATE_LIMIT_IGNORE_KEY: String; DATABASE_URL: String; - MEILISEARCH_READ_ADDR: String; - MEILISEARCH_WRITE_ADDRS: StringCsv; - MEILISEARCH_KEY: String; REDIS_URL: String; BIND_ADDR: String; SELF_ADDR: String; @@ -142,6 +140,17 @@ vars! { ALLOWED_CALLBACK_URLS: Json>; ANALYTICS_ALLOWED_ORIGINS: Json>; + // search + SEARCH_BACKEND: crate::search::SearchBackendKind; + MEILISEARCH_READ_ADDR: String; + MEILISEARCH_WRITE_ADDRS: StringCsv; + MEILISEARCH_KEY: String; + ELASTICSEARCH_URL: String; + ELASTICSEARCH_INDEX_PREFIX: String; + ELASTICSEARCH_USERNAME: String = ""; + ELASTICSEARCH_PASSWORD: String = ""; + ELASTICSEARCH_INDEX_CHUNK_SIZE: i64 = 5000i64; + // storage STORAGE_BACKEND: crate::file_hosting::FileHostKind; diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 4b6f484a95..4cf9aad313 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -56,7 +56,7 @@ pub struct LabrinthConfig { pub file_host: Arc, pub scheduler: Arc, pub ip_salt: Pepper, - pub search_config: search::SearchConfig, + pub search_backend: web::Data, pub session_queue: web::Data, pub payouts_queue: web::Data, pub analytics_queue: Arc, @@ -75,7 +75,7 @@ pub fn app_setup( pool: PgPool, ro_pool: ReadOnlyPgPool, redis_pool: RedisPool, - search_config: search::SearchConfig, + search_backend: actix_web::web::Data, clickhouse: &mut Client, file_host: Arc, stripe_client: stripe::Client, @@ -113,19 +113,22 @@ pub fn app_setup( let local_index_interval = Duration::from_secs(ENV.LOCAL_INDEX_INTERVAL); let pool_ref = pool.clone(); - let search_config_ref = search_config.clone(); let redis_pool_ref = redis_pool.clone(); + let search_backend_ref = search_backend.clone(); scheduler.run(local_index_interval, move || { let pool_ref = pool_ref.clone(); let redis_pool_ref = redis_pool_ref.clone(); - let search_config_ref = search_config_ref.clone(); + let search_backend = search_backend_ref.clone(); async move { - background_task::index_search( + if let Err(err) = background_task::index_search( pool_ref, redis_pool_ref, - search_config_ref, + search_backend, ) - .await; + .await + { + warn!("Failed to index search: {err:?}"); + } } }); @@ -269,7 +272,7 @@ pub fn app_setup( file_host, scheduler: Arc::new(scheduler), ip_salt, - search_config, + search_backend, session_queue, payouts_queue: web::Data::new(PayoutsQueue::new()), analytics_queue, @@ -307,7 +310,7 @@ pub fn app_config( .app_data(web::Data::new(labrinth_config.pool.clone())) .app_data(web::Data::new(labrinth_config.ro_pool.clone())) .app_data(web::Data::new(labrinth_config.file_host.clone())) - .app_data(web::Data::new(labrinth_config.search_config.clone())) + .app_data(labrinth_config.search_backend.clone()) .app_data(web::Data::new(labrinth_config.gotenberg_client.clone())) .app_data(labrinth_config.session_queue.clone()) .app_data(labrinth_config.payouts_queue.clone()) diff --git a/apps/labrinth/src/main.rs b/apps/labrinth/src/main.rs index d18984391c..89a3eba804 100644 --- a/apps/labrinth/src/main.rs +++ b/apps/labrinth/src/main.rs @@ -17,6 +17,7 @@ use labrinth::utoipa_app_config; use labrinth::{app_config, env}; use labrinth::{clickhouse, database, file_hosting}; use std::ffi::CStr; +use std::io; use std::sync::Arc; use tracing::{Instrument, info, info_span}; use tracing_actix_web::TracingLogger; @@ -56,7 +57,6 @@ struct Args { fn main() -> std::io::Result<()> { color_eyre::install().expect("failed to install `color-eyre`"); - dotenvy::dotenv().ok(); modrinth_util::log::init().expect("failed to initialize logging"); env::init().expect("failed to initialize environment variables"); @@ -152,7 +152,8 @@ async fn app() -> std::io::Result<()> { info!("Initializing clickhouse connection"); let mut clickhouse = clickhouse::init_client().await.unwrap(); - let search_config = search::SearchConfig::new(None); + let search_backend = + actix_web::web::Data::from(Arc::from(search::backend(None))); let stripe_client = stripe::Client::new(ENV.STRIPE_API_KEY.clone()); @@ -167,19 +168,20 @@ async fn app() -> std::io::Result<()> { if let Some(task) = args.run_background_task { info!("Running task {task:?} and exiting"); - task.run( - pool, - ro_pool.into_inner(), - redis_pool, - search_config, - clickhouse, - stripe_client, - anrok_client.clone(), - email_queue, - muralpay, - ) - .await; - return Ok(()); + return task + .run( + pool, + ro_pool.into_inner(), + redis_pool, + search_backend, + clickhouse, + stripe_client, + anrok_client.clone(), + email_queue, + muralpay, + ) + .await + .map_err(io::Error::other); } let prometheus = PrometheusMetricsBuilder::new("labrinth") @@ -206,7 +208,7 @@ async fn app() -> std::io::Result<()> { pool.clone(), ro_pool.clone(), redis_pool.clone(), - search_config.clone(), + search_backend.clone(), &mut clickhouse, file_host.clone(), stripe_client, diff --git a/apps/labrinth/src/queue/email.rs b/apps/labrinth/src/queue/email.rs index bb9a205468..8d012c1bab 100644 --- a/apps/labrinth/src/queue/email.rs +++ b/apps/labrinth/src/queue/email.rs @@ -102,8 +102,6 @@ impl Mailer { #[derive(Error, Debug)] pub enum MailError { - #[error("Environment Error")] - Env(#[from] dotenvy::Error), #[error("Mail Error: {0}")] Mail(#[from] lettre::error::Error), #[error("Address Parse Error: {0}")] @@ -136,7 +134,7 @@ impl EmailQueue { pg, redis, mailer: Arc::new(TokioMutex::new(Mailer::Uninitialized)), - identity: templates::MailingIdentity::from_env()?, + identity: templates::MailingIdentity::from_env(), client: Client::builder() .user_agent("Modrinth") .build() diff --git a/apps/labrinth/src/queue/email/templates.rs b/apps/labrinth/src/queue/email/templates.rs index e5cecdb3cc..e1d3b97d0c 100644 --- a/apps/labrinth/src/queue/email/templates.rs +++ b/apps/labrinth/src/queue/email/templates.rs @@ -95,8 +95,8 @@ pub struct MailingIdentity { } impl MailingIdentity { - pub fn from_env() -> dotenvy::Result { - Ok(Self { + pub fn from_env() -> Self { + Self { from_name: ENV.SMTP_FROM_NAME.clone(), from_address: ENV.SMTP_FROM_ADDRESS.clone(), reply_name: if ENV.SMTP_REPLY_TO_NAME.is_empty() { @@ -109,7 +109,7 @@ impl MailingIdentity { } else { Some(ENV.SMTP_REPLY_TO_ADDRESS.clone()) }, - }) + } } } diff --git a/apps/labrinth/src/routes/internal/admin.rs b/apps/labrinth/src/routes/internal/admin.rs index ecc230cfff..05cca02f29 100644 --- a/apps/labrinth/src/routes/internal/admin.rs +++ b/apps/labrinth/src/routes/internal/admin.rs @@ -7,7 +7,7 @@ use crate::models::pats::Scopes; use crate::queue::analytics::AnalyticsQueue; use crate::queue::session::AuthQueue; use crate::routes::ApiError; -use crate::search::SearchConfig; +use crate::search::SearchBackend; use crate::util::date::get_current_tenths_of_ms; use crate::util::guards::admin_key_guard; use actix_web::{HttpRequest, HttpResponse, patch, post, web}; @@ -152,10 +152,12 @@ pub async fn count_download( pub async fn force_reindex( pool: web::Data, redis: web::Data, - config: web::Data, + search_backend: web::Data, ) -> Result { - use crate::search::indexing::index_projects; let redis = redis.get_ref(); - index_projects(pool.as_ref().clone(), redis.clone(), &config).await?; + search_backend + .index_projects(pool.as_ref().clone(), redis.clone()) + .await + .map_err(ApiError::Internal)?; Ok(HttpResponse::NoContent().finish()) } diff --git a/apps/labrinth/src/routes/internal/search.rs b/apps/labrinth/src/routes/internal/search.rs index 43bd21ecd7..86a3615d55 100644 --- a/apps/labrinth/src/routes/internal/search.rs +++ b/apps/labrinth/src/routes/internal/search.rs @@ -1,12 +1,9 @@ -use crate::routes::ApiError; -use crate::search::SearchConfig; use crate::util::guards::admin_key_guard; -use actix_web::{HttpResponse, delete, get, web}; -use meilisearch_sdk::tasks::{Task, TasksCancelQuery}; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::time::Duration; -use utoipa::ToSchema; +use crate::{ + routes::ApiError, + search::{SearchBackend, TasksCancelFilter}, +}; +use actix_web::{delete, get, web}; pub fn config(cfg: &mut utoipa_actix_web::service_config::ServiceConfig) { cfg.service(tasks).service(tasks_cancel); @@ -15,107 +12,20 @@ pub fn config(cfg: &mut utoipa_actix_web::service_config::ServiceConfig) { #[utoipa::path] #[get("tasks", guard = "admin_key_guard")] pub async fn tasks( - config: web::Data, -) -> Result { - let client = config.make_batch_client()?; - let tasks = client - .with_all_clients("get_tasks", async |client| { - let tasks = client.get_tasks().await?; - - Ok(tasks.results) - }) - .await?; - - #[derive(Serialize, ToSchema)] - struct MeiliTask