From 22be5a902203f1872afaf06c6524f61a3b85ac02 Mon Sep 17 00:00:00 2001 From: aecsocket Date: Thu, 12 Feb 2026 14:23:51 +0000 Subject: [PATCH 01/30] initial elasticsearch impl --- Cargo.toml | 120 ++++++------ apps/labrinth/.env.local | 8 + apps/labrinth/Cargo.toml | 70 +++---- .../src/search/backend/elasticsearch.rs | 174 ++++++++++++++++++ .../src/search/backend/meilisearch.rs | 74 ++++++++ apps/labrinth/src/search/backend/mod.rs | 93 ++++++++++ apps/labrinth/src/search/mod.rs | 54 ++++++ docker-compose.yml | 100 +++++++++- 8 files changed, 581 insertions(+), 112 deletions(-) create mode 100644 apps/labrinth/src/search/backend/elasticsearch.rs create mode 100644 apps/labrinth/src/search/backend/meilisearch.rs create mode 100644 apps/labrinth/src/search/backend/mod.rs diff --git a/Cargo.toml b/Cargo.toml index d5d9ba9131..a8f5b65825 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,17 +1,17 @@ [workspace] resolver = "2" members = [ - "apps/app", - "apps/app-playground", - "apps/daedalus_client", - "apps/labrinth", - "packages/app-lib", - "packages/ariadne", - "packages/daedalus", - "packages/modrinth-log", - "packages/modrinth-maxmind", - "packages/modrinth-util", - "packages/path-util", + "apps/app", + "apps/app-playground", + "apps/daedalus_client", + "apps/labrinth", + "packages/app-lib", + "packages/ariadne", + "packages/daedalus", + "packages/modrinth-log", + "packages/modrinth-maxmind", + "packages/modrinth-util", + "packages/path-util", ] [workspace.package] @@ -34,12 +34,10 @@ ariadne = { path = "packages/ariadne" } async-compression = { version = "0.4.32", default-features = false } async-recursion = "1.1.1" async-stripe = { version = "0.41.0", default-features = false, features = [ - "runtime-tokio-hyper-rustls", + "runtime-tokio-hyper-rustls", ] } async-trait = "0.1.89" -async-tungstenite = { version = "0.31.0", default-features = false, features = [ - "futures-03-sink" -] } +async-tungstenite = { version = "0.31.0", default-features = false, features = ["futures-03-sink"] } async-walkdir = "2.1.0" async_zip = "0.0.18" base64 = "0.22.1" @@ -49,9 +47,7 @@ bytes = "1.10.1" censor = "0.3.0" chardetng = "0.1.17" chrono = "0.4.42" -cidre = { version = "0.11.3", default-features = false, features = [ - "macos_15_0" -] } +cidre = { version = "0.11.3", default-features = false, features = ["macos_15_0"] } clap = "4.5.48" clickhouse = "0.14.0" color-eyre = "0.6.5" @@ -82,10 +78,10 @@ hickory-resolver = "0.25.2" hmac = "0.12.1" hyper = "1.7.0" hyper-rustls = { version = "0.27.7", default-features = false, features = [ - "aws-lc-rs", - "http1", - "native-tokio", - "tls12", + "aws-lc-rs", + "http1", + "native-tokio", + "tls12", ] } hyper-util = "0.1.17" iana-time-zone = "0.1.64" @@ -96,18 +92,19 @@ itertools = "0.14.0" jemalloc_pprof = "0.8.1" json-patch = { version = "4.1.0", default-features = false } lettre = { version = "0.11.19", default-features = false, features = [ - "aws-lc-rs", - "builder", - "hostname", - "pool", - "rustls", - "rustls-native-certs", - "smtp-transport", - "tokio1", - "tokio1-rustls", + "aws-lc-rs", + "builder", + "hostname", + "pool", + "rustls", + "rustls-native-certs", + "smtp-transport", + "tokio1", + "tokio1-rustls", ] } maxminddb = "0.26.0" meilisearch-sdk = { version = "0.30.0", default-features = false } +elasticsearch = "9.1" modrinth-log = { path = "packages/modrinth-log" } modrinth-util = { path = "packages/modrinth-util" } muralpay = { path = "packages/muralpay" } @@ -124,32 +121,29 @@ png = "0.18.0" prometheus = "0.14.0" quartz_nbt = "0.2.9" quick-xml = "0.38.3" -rand = "=0.8.5" # Locked on 0.8 until argon2 and p256 update to 0.9 -rand_chacha = "=0.3.1" # Locked on 0.3 until we can update rand to 0.9 +rand = "=0.8.5" # Locked on 0.8 until argon2 and p256 update to 0.9 +rand_chacha = "=0.3.1" # Locked on 0.3 until we can update rand to 0.9 redis = "0.32.7" regex = "1.12.2" reqwest = { version = "0.12.24", default-features = false } rgb = "0.8.52" -rust_decimal = { version = "1.39.0", features = [ - "serde-with-float", - "serde-with-str" -] } +rust_decimal = { version = "1.39.0", features = ["serde-with-float", "serde-with-str"] } rust_iso3166 = "0.1.14" rust-s3 = { version = "0.37.0", default-features = false, features = [ - "fail-on-err", - "tags", - "tokio-rustls-tls", + "fail-on-err", + "tags", + "tokio-rustls-tls", ] } rustls = "0.23.32" rusty-money = "0.4.1" secrecy = "0.10.3" sentry = { version = "0.45.0", default-features = false, features = [ - "backtrace", - "contexts", - "debug-images", - "panic", - "reqwest", - "rustls", + "backtrace", + "contexts", + "debug-images", + "panic", + "reqwest", + "rustls", ] } serde = "1.0.228" serde_bytes = "0.11.19" @@ -157,7 +151,7 @@ serde_cbor = "0.11.2" serde_ini = "0.2.0" serde_json = "1.0.145" serde_with = "3.15.0" -serde-xml-rs = "0.8.1" # Also an XML (de)serializer, consider dropping yaserde in favor of this +serde-xml-rs = "0.8.1" # Also an XML (de)serializer, consider dropping yaserde in favor of this sha1 = "0.10.6" sha1_smol = { version = "1.0.1", features = ["std"] } sha2 = "0.10.9" @@ -177,8 +171,8 @@ tauri-plugin-opener = "2.5.0" tauri-plugin-os = "2.3.1" tauri-plugin-single-instance = "2.3.4" tauri-plugin-updater = { version = "2.9.0", default-features = false, features = [ - "rustls-tls", - "zip", + "rustls-tls", + "zip", ] } tauri-plugin-window-state = "2.4.0" tempfile = "3.23.0" @@ -204,19 +198,19 @@ utoipa-swagger-ui = { version = "9.0.2", features = ["actix-web", "vendored"] } uuid = "1.18.1" validator = "0.20.0" webp = { version = "0.3.1", default-features = false } -webview2-com = "0.38.0" # Should be updated in lockstep with wry +webview2-com = "0.38.0" # Should be updated in lockstep with wry whoami = "1.6.1" -windows = "=0.61.3" # Locked on 0.61 until we can update windows-core to 0.62 -windows-core = "=0.61.2" # Locked on 0.61 until webview2-com updates to 0.62 +windows = "=0.61.3" # Locked on 0.61 until we can update windows-core to 0.62 +windows-core = "=0.61.2" # Locked on 0.61 until webview2-com updates to 0.62 winreg = "0.55.0" woothee = "0.13.0" yaserde = "0.12.0" zbus = "5.11.0" zip = { version = "6.0.0", default-features = false, features = [ - "bzip2", - "deflate", - "deflate64", - "zstd", + "bzip2", + "deflate", + "deflate64", + "zstd", ] } zxcvbn = "3.1.0" @@ -261,14 +255,14 @@ opt-level = 3 # Optimize for speed and reduce size on release builds [profile.release] -opt-level = "s" # Optimize for binary size -strip = true # Remove debug symbols -lto = true # Enables link to optimizations -panic = "abort" # Strip expensive panic clean-up logic -codegen-units = 1 # Compile crates one after another so the compiler can optimize better +opt-level = "s" # Optimize for binary size +strip = true # Remove debug symbols +lto = true # Enables link to optimizations +panic = "abort" # Strip expensive panic clean-up logic +codegen-units = 1 # Compile crates one after another so the compiler can optimize better # Specific profile for labrinth production builds [profile.release-labrinth] inherits = "release" -strip = false # Keep debug symbols for Sentry -panic = "unwind" # Don't exit the whole app on panic in production +strip = false # Keep debug symbols for Sentry +panic = "unwind" # Don't exit the whole app on panic in production diff --git a/apps/labrinth/.env.local b/apps/labrinth/.env.local index 0040a36ba5..ea97af6507 100644 --- a/apps/labrinth/.env.local +++ b/apps/labrinth/.env.local @@ -16,6 +16,9 @@ DATABASE_URL=postgresql://labrinth:labrinth@localhost/labrinth DATABASE_MIN_CONNECTIONS=0 DATABASE_MAX_CONNECTIONS=16 +SEARCH_BACKEND=meilisearch + +# Meilisearch configuration MEILISEARCH_READ_ADDR=http://localhost:7700 MEILISEARCH_WRITE_ADDRS=http://localhost:7700 # 5 minutes in milliseconds @@ -26,6 +29,11 @@ SEARCH_OPERATION_TIMEOUT=300000 # MEILISEARCH_WRITE_ADDRS=http://localhost:7700,http://localhost:7701 MEILISEARCH_KEY=modrinth +MEILISEARCH_META_NAMESPACE= + +# Elasticsearch configuration +ELASTICSEARCH_URL=http://localhost:9200 +ELASTICSEARCH_INDEX_PREFIX=labrinth REDIS_URL=redis://localhost REDIS_MIN_CONNECTIONS=0 diff --git a/apps/labrinth/Cargo.toml b/apps/labrinth/Cargo.toml index 76cbff499e..f00b494c1c 100644 --- a/apps/labrinth/Cargo.toml +++ b/apps/labrinth/Cargo.toml @@ -21,12 +21,7 @@ actix-ws = { workspace = true } arc-swap = { workspace = true } argon2 = { workspace = true } ariadne = { workspace = true } -async-stripe = { workspace = true, features = [ - "billing", - "checkout", - "connect", - "webhook-events", -] } +async-stripe = { workspace = true, features = ["billing", "checkout", "connect", "webhook-events"] } async-trait = { workspace = true } base64 = { workspace = true } bitflags = { workspace = true } @@ -51,26 +46,27 @@ hmac = { workspace = true } hyper-rustls = { workspace = true } hyper-util = { workspace = true } image = { workspace = true, features = [ - "avif", - "bmp", - "dds", - "exr", - "ff", - "gif", - "hdr", - "ico", - "jpeg", - "png", - "pnm", - "qoi", - "tga", - "tiff", - "webp", + "avif", + "bmp", + "dds", + "exr", + "ff", + "gif", + "hdr", + "ico", + "jpeg", + "png", + "pnm", + "qoi", + "tga", + "tiff", + "webp", ] } itertools = { workspace = true } json-patch = { workspace = true } lettre = { workspace = true } meilisearch-sdk = { workspace = true, features = ["reqwest"] } +elasticsearch = { workspace = true } modrinth-util = { workspace = true, features = ["decimal", "sentry", "utoipa"] } muralpay = { workspace = true, features = ["client", "mock", "utoipa"] } murmur2 = { workspace = true } @@ -81,16 +77,8 @@ rand = { workspace = true } rand_chacha = { workspace = true } redis = { workspace = true, features = ["ahash", "r2d2", "tokio-comp"] } regex = { workspace = true } -reqwest = { workspace = true, features = [ - "http2", - "json", - "multipart", - "rustls-tls-webpki-roots", -] } -rust_decimal = { workspace = true, features = [ - "serde-with-float", - "serde-with-str", -] } +reqwest = { workspace = true, features = ["http2", "json", "multipart", "rustls-tls-webpki-roots"] } +rust_decimal = { workspace = true, features = ["serde-with-float", "serde-with-str"] } rust_iso3166 = { workspace = true } rust-s3 = { workspace = true } rustls.workspace = true @@ -103,14 +91,14 @@ sha1 = { workspace = true } sha2 = { workspace = true } spdx = { workspace = true, features = ["text"] } sqlx = { workspace = true, features = [ - "chrono", - "json", - "macros", - "migrate", - "postgres", - "runtime-tokio", - "rust_decimal", - "tls-rustls-aws-lc-rs", + "chrono", + "json", + "macros", + "migrate", + "postgres", + "runtime-tokio", + "rust_decimal", + "tls-rustls-aws-lc-rs", ] } sqlx-tracing = { workspace = true, features = ["postgres"] } strum = { workspace = true } @@ -145,8 +133,8 @@ iana-time-zone = { workspace = true } jemalloc_pprof = { workspace = true, features = ["flamegraph"] } tikv-jemalloc-ctl = { workspace = true, features = ["stats"] } tikv-jemallocator = { workspace = true, features = [ - "profiling", - "unprefixed_malloc_on_supported_platforms", + "profiling", + "unprefixed_malloc_on_supported_platforms", ] } [features] diff --git a/apps/labrinth/src/search/backend/elasticsearch.rs b/apps/labrinth/src/search/backend/elasticsearch.rs new file mode 100644 index 0000000000..17e5c842e4 --- /dev/null +++ b/apps/labrinth/src/search/backend/elasticsearch.rs @@ -0,0 +1,174 @@ +use crate::database::redis::RedisPool; +use crate::models::ids::VersionId; +use crate::search::backend::{ + BackendType, IndexingError, SearchBackend, TaskInfo, TaskStatus, + TasksCancelFilter, +}; +use crate::search::{ + ResultSearchProject, SearchRequest, SearchResults, UploadSearchProject, +}; +use ariadne::ids::to_base62; +use async_trait::async_trait; +use elasticsearch::http::transport::{ + SingleNodeConnectionPool, TransportBuilder, +}; +use elasticsearch::{Elasticsearch, SearchParts}; +use serde_json::json; +use tracing::info; + +pub struct ElasticsearchBackend { + pub client: Elasticsearch, + pub backend_type: BackendType, + pub index_prefix: String, +} + +impl ElasticsearchBackend { + pub fn new(url: &str, index_prefix: &str) -> Result { + let url_p = url.parse::().map_err(|e| e.to_string())?; + let conn_pool = SingleNodeConnectionPool::new(url_p); + let transport = TransportBuilder::new(conn_pool) + .build() + .map_err(|e| e.to_string())?; + let client = Elasticsearch::new(transport); + + Ok(Self { + client, + backend_type: BackendType::Elasticsearch, + index_prefix: index_prefix.to_string(), + }) + } + + fn get_projects_index_name(&self, suffix: Option<&str>) -> String { + if let Some(s) = suffix { + format!("{}_projects_{}", self.index_prefix, s) + } else { + format!("{}_projects", self.index_prefix) + } + } +} + +#[async_trait] +impl SearchBackend for ElasticsearchBackend { + async fn search( + &self, + request: &SearchRequest, + ) -> Result { + let offset: usize = request.offset.as_deref().unwrap_or("0").parse()?; + let index = request.index.as_deref().unwrap_or("relevance"); + let limit = request + .limit + .as_deref() + .unwrap_or("10") + .parse::()? + .min(100); + + let index_name = match index { + "relevance" => self.get_projects_index_name(Some("relevance")), + "downloads" => self.get_projects_index_name(Some("filtered")), + "follows" => self.get_projects_index_name(Some("relevance")), + "updated" => self.get_projects_index_name(Some("relevance")), + "newest" => self.get_projects_index_name(Some("relevance")), + i => return Err(SearchError::InvalidIndex(i.to_string())), + }; + + let sort_field = match index { + "downloads" => "downloads", + "follows" => "follows", + "updated" => "date_modified", + "newest" => "date_created", + _ => "_score", + }; + + let mut query_body = json!({ + "from": offset, + "size": limit, + "sort": [format!("{}:desc", sort_field)], + }); + + if let Some(q) = request.query.as_deref() { + if !q.is_empty() { + query_body["query"] = json!({ + "multi_match": { + "query": q, + "fields": ["name^3", "summary^2", "author"] + } + }); + } + } + + if let Some(f) = request.filters.as_deref() { + query_body["query"] = json!({ + "query_string": { + "query": f + } + }); + } + + info!("Executing Elasticsearch search on index: {}", index_name); + let response = self + .client + .search(SearchParts::Index(&[&index_name])) + .body(query_body) + .send() + .await + .map_err(|e| SearchError::Elasticsearch(e.to_string()))?; + + let response_body: serde_json::Value = + response.json().await.map_err(|e| SearchError::Serde(e))?; + + let hits = + response_body["hits"]["hits"].as_array().ok_or_else(|| { + SearchError::Serde("No hits array in response".into()) + })?; + + let total_hits = response_body["hits"]["total"] + .as_object() + .and_then(|o| o.get("value")) + .and_then(|v| v.as_i64()) + .unwrap_or(0) as usize; + + let results: Vec = hits + .iter() + .filter_map(|hit| hit["_source"].as_object().cloned()) + .map(|source| serde_json::from_value(source.into())) + .collect::, _>>() + .map_err(|e| SearchError::Serde(e))?; + + Ok(SearchResults { + hits: results, + page: (offset / limit) + 1, + hits_per_page: limit, + total_hits, + }) + } + + async fn index_projects( + &self, + ro_pool: sqlx::PgPool, + _redis: RedisPool, + ) -> Result<(), IndexingError> { + info!("Elasticsearch indexing not yet implemented"); + Ok(()) + } + + async fn remove_documents( + &self, + ids: &[VersionId], + ) -> Result<(), IndexingError> { + info!("Removing {} documents from Elasticsearch", ids.len()); + Ok(()) + } + + async fn get_tasks(&self) -> Result, IndexingError> { + info!("Getting Elasticsearch tasks not yet implemented"); + Ok(vec![]) + } + + async fn cancel_tasks( + &self, + _filter: TasksCancelFilter, + ) -> Result<(), IndexingError> { + info!("Cancelling Elasticsearch tasks not yet implemented"); + Ok(()) + } +} diff --git a/apps/labrinth/src/search/backend/meilisearch.rs b/apps/labrinth/src/search/backend/meilisearch.rs new file mode 100644 index 0000000000..1ba1602067 --- /dev/null +++ b/apps/labrinth/src/search/backend/meilisearch.rs @@ -0,0 +1,74 @@ +use crate::database::redis::RedisPool; +use crate::models::ids::VersionId; +use crate::models::projects::SearchRequest; +use crate::search::backend::{ + BackendType, IndexingError, SearchBackend, TaskInfo, TaskStatus, + TasksCancelFilter, +}; +use crate::search::indexing::cancel_tasks as meilisearch_cancel_tasks; +use crate::search::indexing::get_tasks as meilisearch_get_tasks; +use crate::search::indexing::index_projects as meilisearch_index_projects; +use crate::search::indexing::remove_documents as meilisearch_remove_documents; +use crate::search::search_for_project as meilisearch_search; +use crate::search::{ + SearchConfig, SearchError, SearchResults, UploadSearchProject, +}; +use async_trait::async_trait; + +pub struct MeilisearchBackend { + pub config: SearchConfig, + pub backend_type: BackendType, +} + +impl MeilisearchBackend { + pub fn new(config: SearchConfig) -> Self { + Self { + config, + backend_type: BackendType::Meilisearch, + } + } +} + +#[async_trait] +impl SearchBackend for MeilisearchBackend { + async fn search( + &self, + request: &SearchRequest, + ) -> Result { + meilisearch_search(request, &self.config).await + } + + async fn index_projects( + &self, + ro_pool: sqlx::PgPool, + redis: RedisPool, + ) -> Result<(), IndexingError> { + meilisearch_index_projects(ro_pool, redis, &self.config) + .await + .map_err(|e| IndexingError::Meilisearch(e.to_string())) + } + + async fn remove_documents( + &self, + ids: &[VersionId], + ) -> Result<(), IndexingError> { + meilisearch_remove_documents(ids, &self.config) + .await + .map_err(|e| IndexingError::Meilisearch(e.to_string())) + } + + async fn get_tasks(&self) -> Result, IndexingError> { + meilisearch_get_tasks(&self.config) + .await + .map_err(|e| IndexingError::Meilisearch(e.to_string())) + } + + async fn cancel_tasks( + &self, + filter: TasksCancelFilter, + ) -> Result<(), IndexingError> { + meilisearch_cancel_tasks(&self.config, filter) + .await + .map_err(|e| IndexingError::Meilisearch(e.to_string())) + } +} diff --git a/apps/labrinth/src/search/backend/mod.rs b/apps/labrinth/src/search/backend/mod.rs new file mode 100644 index 0000000000..64a45fb2cb --- /dev/null +++ b/apps/labrinth/src/search/backend/mod.rs @@ -0,0 +1,93 @@ +use crate::models::projects::SearchRequest; +use crate::{models::error::ApiError, search::SearchResults}; +use async_trait::async_trait; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BackendType { + Meilisearch, + Elasticsearch, +} + +#[derive(Debug, Clone)] +pub struct TasksCancelFilter { + pub index_name: Option, + pub cancel_all: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskInfo { + pub uid: u32, + pub status: TaskStatus, + pub duration: Option, + pub enqueued_at: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TaskStatus { + Enqueued, + Processing, + Failed, + Succeeded, +} + +#[async_trait] +pub trait SearchBackend: Send + Sync { + async fn search( + &self, + request: &SearchRequest, + ) -> Result; + + async fn index_projects( + &self, + ro_pool: sqlx::PgPool, + redis: crate::database::redis::RedisPool, + ) -> Result<(), IndexingError>; + + async fn remove_documents( + &self, + ids: &[crate::models::ids::VersionId], + ) -> Result<(), IndexingError>; + + async fn get_tasks(&self) -> Result, IndexingError>; + + async fn cancel_tasks( + &self, + filter: TasksCancelFilter, + ) -> Result<(), IndexingError>; +} + +#[derive(Error, Debug)] +pub enum SearchError { + #[error("Meilisearch Error: {0}")] + Meilisearch(#[from] meilisearch_sdk::errors::Error), + #[error("Elasticsearch Error: {0}")] + Elasticsearch(String), + #[error("Error while serializing or deserializing JSON: {0}")] + Serde(#[from] serde_json::Error), + #[error("Error while parsing an integer: {0}")] + IntParsing(#[from] std::num::ParseIntError), + #[error("Error while formatting strings: {0}")] + FormatError(#[from] std::fmt::Error), + #[error("Environment Error")] + Env(#[from] dotenvy::Error), + #[error("Invalid index to sort by: {0}")] + InvalidIndex(String), + #[error("Unknown backend type: {0}")] + UnknownBackend(String), +} + +#[derive(Error, Debug)] +pub enum IndexingError { + #[error("Meilisearch Indexing Error: {0}")] + Meilisearch(String), + #[error("Elasticsearch Indexing Error: {0}")] + Elasticsearch(String), + #[error("Database Error: {0}")] + Database(#[from] sqlx::Error), + #[error("Error while serializing or deserializing JSON: {0}")] + Serde(#[from] serde_json::Error), + #[error("Error while awaiting index creation task")] + Task, + #[error("Environment Error")] + Env(#[from] dotenvy::Error), +} diff --git a/apps/labrinth/src/search/mod.rs b/apps/labrinth/src/search/mod.rs index 636abd9de7..0a4233abb8 100644 --- a/apps/labrinth/src/search/mod.rs +++ b/apps/labrinth/src/search/mod.rs @@ -16,12 +16,15 @@ use std::fmt::Write; use thiserror::Error; use tracing::{Instrument, info_span}; +pub mod backend; pub mod indexing; #[derive(Error, Debug)] pub enum SearchError { #[error("MeiliSearch Error: {0}")] MeiliSearch(#[from] meilisearch_sdk::errors::Error), + #[error("Elasticsearch Error: {0}")] + Elasticsearch(String), #[error("Error while serializing or deserializing JSON: {0}")] Serde(#[from] serde_json::Error), #[error("Error while parsing an integer: {0}")] @@ -32,6 +35,8 @@ pub enum SearchError { Env(#[from] dotenvy::Error), #[error("Invalid index to sort by: {0}")] InvalidIndex(String), + #[error("Unknown backend type: {0}")] + UnknownBackend(String), } impl actix_web::ResponseError for SearchError { @@ -39,10 +44,14 @@ impl actix_web::ResponseError for SearchError { match self { SearchError::Env(..) => StatusCode::INTERNAL_SERVER_ERROR, SearchError::MeiliSearch(..) => StatusCode::BAD_REQUEST, + SearchError::Elasticsearch(..) => StatusCode::BAD_REQUEST, SearchError::Serde(..) => StatusCode::BAD_REQUEST, SearchError::IntParsing(..) => StatusCode::BAD_REQUEST, SearchError::InvalidIndex(..) => StatusCode::BAD_REQUEST, SearchError::FormatError(..) => StatusCode::BAD_REQUEST, + SearchError::UnknownBackend(..) => { + StatusCode::INTERNAL_SERVER_ERROR + } } } @@ -51,10 +60,12 @@ impl actix_web::ResponseError for SearchError { error: match self { SearchError::Env(..) => "environment_error", SearchError::MeiliSearch(..) => "meilisearch_error", + SearchError::Elasticsearch(..) => "elasticsearch_error", SearchError::Serde(..) => "invalid_input", SearchError::IntParsing(..) => "invalid_input", SearchError::InvalidIndex(..) => "invalid_input", SearchError::FormatError(..) => "invalid_input", + SearchError::UnknownBackend(..) => "internal_error", }, description: self.to_string(), details: None, @@ -266,6 +277,49 @@ pub fn get_sort_index( }) } +pub fn get_backend() -> Result< + Box, + SearchError, +> { + use crate::search::backend::SearchBackend; + use crate::search::backend::{ + BackendType, ElasticsearchBackend, MeilisearchBackend, + }; + + let backend_type_str = dotenvy::var("SEARCH_BACKEND") + .unwrap_or_else(|_| "meilisearch".to_string()); + + let backend_type = match backend_type_str.as_str() { + "elasticsearch" => BackendType::Elasticsearch, + _ => BackendType::Meilisearch, + }; + + match backend_type { + BackendType::Elasticsearch => { + let url = dotenvy::var("ELASTICSEARCH_URL") + .unwrap_or_else(|_| "http://localhost:9200".to_string()); + let index_prefix = dotenvy::var("ELASTICSEARCH_INDEX_PREFIX") + .unwrap_or_else(|_| "labrinth".to_string()); + + info!( + "Using Elasticsearch backend at {} with prefix {}", + url, index_prefix + ); + let backend = ElasticsearchBackend::new(&url, &index_prefix) + .map_err(|e| SearchError::Elasticsearch(e))?; + Ok(Box::new(backend)) + } + BackendType::Meilisearch => { + let meta_namespace = + dotenvy::var("MEILISEARCH_META_NAMESPACE").ok(); + info!("Using Meilisearch backend"); + let config = SearchConfig::new(meta_namespace); + let backend = MeilisearchBackend::new(config); + Ok(Box::new(backend)) + } + } +} + pub async fn search_for_project( info: &SearchRequest, config: &SearchConfig, diff --git a/docker-compose.yml b/docker-compose.yml index 2d0b6958cd..df769000fa 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,6 +37,87 @@ services: interval: 3s timeout: 5s retries: 3 + elasticsearch0: + image: elasticsearch:9.3.0 + container_name: labrinth-elasticsearch0 + networks: + - elasticsearch-mesh + restart: on-failure + ports: + - '127.0.0.1:9200:9200' + volumes: + - elasticsearch0-data:/usr/share/elasticsearch/data + environment: + - discovery.type=single-node + - cluster.name=labrinth + - node.name=elasticsearch0 + - xpack.security.enabled=false + - 'ES_JAVA_OPTS=-Xms1g -Xmx1g' + - bootstrap.memory_lock=true + - logger.level=WARN + healthcheck: + test: + [ + 'CMD-SHELL', + 'curl -s http://localhost:9200/_cluster/health | grep -q green', + ] + interval: 10s + timeout: 5s + retries: 10 + elasticsearch1: + image: elasticsearch:9.3.0 + container_name: labrinth-elasticsearch1 + networks: + - elasticsearch-mesh + restart: on-failure + ports: + - '127.0.0.1:9201:9200' + volumes: + - elasticsearch1-data:/usr/share/elasticsearch/data + environment: + - discovery.type=single-node + - cluster.name=labrinth + - node.name=elasticsearch1 + - xpack.security.enabled=false + - 'ES_JAVA_OPTS=-Xms1g -Xmx1g' + - bootstrap.memory_lock=true + - logger.level=WARN + healthcheck: + test: + [ + 'CMD-SHELL', + 'curl -s http://localhost:9200/_cluster/health | grep -q green', + ] + interval: 10s + timeout: 5s + retries: 10 + elasticsearch2: + image: elasticsearch:9.3.0 + container_name: labrinth-elasticsearch2 + networks: + - elasticsearch-mesh + restart: on-failure + ports: + - '127.0.0.1:9202:9200' + volumes: + - elasticsearch2-data:/usr/share/elasticsearch/data + environment: + - discovery.type=single-node + - cluster.name=labrinth + - node.name=elasticsearch2 + - xpack.security.enabled=false + - 'ES_JAVA_OPTS=-Xms1g -Xmx1g' + - bootstrap.memory_lock=true + - logger.level=WARN + healthcheck: + test: + [ + 'CMD-SHELL', + 'curl -s http://localhost:9200/_cluster/health | grep -q green', + ] + interval: 10s + timeout: 5s + retries: 10 redis: image: redis:alpine container_name: labrinth-redis @@ -89,8 +170,6 @@ services: ports: - '127.0.0.1:13000:3000' extra_hosts: - # Gotenberg must send a message on a webhook to our backend, - # so it must have access to our local network - 'host.docker.internal:host-gateway' labrinth: profiles: @@ -109,6 +188,12 @@ services: condition: service_healthy meilisearch: condition: service_healthy + elasticsearch0: + condition: service_healthy + elasticsearch1: + condition: service_healthy + elasticsearch2: + condition: service_healthy redis: condition: service_healthy clickhouse: @@ -136,14 +221,9 @@ services: timeout: 5s retries: 3 volumes: - # Labrinth deposits version files here; - # Delphi reads them from here - /tmp/modrinth:/tmp/modrinth:ro,z extra_hosts: - # Delphi must send a message on a webhook to our backend, - # so it must have access to our local network - 'host.docker.internal:host-gateway' - # Sharded Meilisearch meilisearch1: profiles: @@ -166,7 +246,6 @@ services: interval: 3s timeout: 5s retries: 3 - nginx-meilisearch-lb: profiles: - sharded-meilisearch @@ -186,9 +265,14 @@ services: networks: meilisearch-mesh: driver: bridge + elasticsearch-mesh: + driver: bridge volumes: meilisearch-data: meilisearch1-data: + elasticsearch0-data: + elasticsearch1-data: + elasticsearch2-data: db-data: redis-data: labrinth-cdn-data: From 31362c28114798b703562f5348ab7352893cdd92 Mon Sep 17 00:00:00 2001 From: aecsocket Date: Thu, 12 Feb 2026 16:00:39 +0000 Subject: [PATCH 02/30] working elastic cluster --- docker-compose.yml | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index df769000fa..9d96bd1677 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -48,13 +48,14 @@ services: volumes: - elasticsearch0-data:/usr/share/elasticsearch/data environment: - - discovery.type=single-node - - cluster.name=labrinth + - logger.level=WARN - node.name=elasticsearch0 + - cluster.name=labrinth + - cluster.initial_master_nodes=elasticsearch0,elasticsearch1,elasticsearch2 + - discovery.seed_hosts=elasticsearch1,elasticsearch2 + - bootstrap.memory_lock=false - xpack.security.enabled=false - - 'ES_JAVA_OPTS=-Xms1g -Xmx1g' - - bootstrap.memory_lock=true - - logger.level=WARN + mem_limit: 1g healthcheck: test: [ @@ -70,23 +71,22 @@ services: networks: - elasticsearch-mesh restart: on-failure - ports: - - '127.0.0.1:9201:9200' volumes: - elasticsearch1-data:/usr/share/elasticsearch/data environment: - - discovery.type=single-node - - cluster.name=labrinth + - logger.level=WARN - node.name=elasticsearch1 + - cluster.name=labrinth + - cluster.initial_master_nodes=elasticsearch0,elasticsearch1,elasticsearch2 + - discovery.seed_hosts=elasticsearch0,elasticsearch2 + - bootstrap.memory_lock=false - xpack.security.enabled=false - - 'ES_JAVA_OPTS=-Xms1g -Xmx1g' - - bootstrap.memory_lock=true - - logger.level=WARN + mem_limit: 1g healthcheck: test: [ 'CMD-SHELL', - 'curl -s http://localhost:9200/_cluster/health | grep -q green', + 'curl -s http://localhost:9201/_cluster/health | grep -q green', ] interval: 10s timeout: 5s @@ -97,23 +97,22 @@ services: networks: - elasticsearch-mesh restart: on-failure - ports: - - '127.0.0.1:9202:9200' volumes: - elasticsearch2-data:/usr/share/elasticsearch/data environment: - - discovery.type=single-node - - cluster.name=labrinth + - logger.level=WARN - node.name=elasticsearch2 + - cluster.name=labrinth + - cluster.initial_master_nodes=elasticsearch0,elasticsearch1,elasticsearch2 + - discovery.seed_hosts=elasticsearch0,elasticsearch1 + - bootstrap.memory_lock=false - xpack.security.enabled=false - - 'ES_JAVA_OPTS=-Xms1g -Xmx1g' - - bootstrap.memory_lock=true - - logger.level=WARN + mem_limit: 1g healthcheck: test: [ 'CMD-SHELL', - 'curl -s http://localhost:9200/_cluster/health | grep -q green', + 'curl -s http://localhost:9202/_cluster/health | grep -q green', ] interval: 10s timeout: 5s From d3c76fadee4278274a2b99cc65a4558fc2f3f94f Mon Sep 17 00:00:00 2001 From: aecsocket Date: Thu, 19 Feb 2026 16:36:58 +0000 Subject: [PATCH 03/30] replace SearchError with ApiError for preparation of search backend --- Cargo.lock | 24 ++++ Cargo.toml | 121 ++++++++++--------- apps/labrinth/src/routes/v2/projects.rs | 4 +- apps/labrinth/src/routes/v3/projects.rs | 7 +- apps/labrinth/src/search/backend/mod.rs | 48 +++++++- apps/labrinth/src/search/mod.rs | 153 ++++++------------------ 6 files changed, 174 insertions(+), 183 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8245cfff0d..253653b0eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2631,6 +2631,29 @@ dependencies = [ "serde", ] +[[package]] +name = "elasticsearch" +version = "9.1.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12bb303aa6e1d28c0c86b6fbfe484fd0fd3f512629aeed1ac4f6b85f81d9834a" +dependencies = [ + "base64 0.22.1", + "bytes", + "dyn-clone", + "flate2", + "lazy_static", + "parking_lot", + "percent-encoding", + "reqwest", + "rustc_version", + "serde", + "serde_json", + "serde_with", + "tokio", + "url", + "void", +] + [[package]] name = "elliptic-curve" version = "0.13.8" @@ -4748,6 +4771,7 @@ dependencies = [ "dotenv-build", "dotenvy", "either", + "elasticsearch", "eyre", "futures", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index a8f5b65825..68ff994185 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,17 +1,17 @@ [workspace] resolver = "2" members = [ - "apps/app", - "apps/app-playground", - "apps/daedalus_client", - "apps/labrinth", - "packages/app-lib", - "packages/ariadne", - "packages/daedalus", - "packages/modrinth-log", - "packages/modrinth-maxmind", - "packages/modrinth-util", - "packages/path-util", + "apps/app", + "apps/app-playground", + "apps/daedalus_client", + "apps/labrinth", + "packages/app-lib", + "packages/ariadne", + "packages/daedalus", + "packages/modrinth-log", + "packages/modrinth-maxmind", + "packages/modrinth-util", + "packages/path-util", ] [workspace.package] @@ -34,10 +34,12 @@ ariadne = { path = "packages/ariadne" } async-compression = { version = "0.4.32", default-features = false } async-recursion = "1.1.1" async-stripe = { version = "0.41.0", default-features = false, features = [ - "runtime-tokio-hyper-rustls", + "runtime-tokio-hyper-rustls", ] } async-trait = "0.1.89" -async-tungstenite = { version = "0.31.0", default-features = false, features = ["futures-03-sink"] } +async-tungstenite = { version = "0.31.0", default-features = false, features = [ + "futures-03-sink" +] } async-walkdir = "2.1.0" async_zip = "0.0.18" base64 = "0.22.1" @@ -47,7 +49,9 @@ bytes = "1.10.1" censor = "0.3.0" chardetng = "0.1.17" chrono = "0.4.42" -cidre = { version = "0.11.3", default-features = false, features = ["macos_15_0"] } +cidre = { version = "0.11.3", default-features = false, features = [ + "macos_15_0" +] } clap = "4.5.48" clickhouse = "0.14.0" color-eyre = "0.6.5" @@ -65,6 +69,7 @@ dotenv-build = "0.1.1" dotenvy = "0.15.7" dunce = "1.0.5" either = "1.15.0" +elasticsearch = "9.1.0-alpha.1" encoding_rs = "0.8.35" enumset = "1.1.10" eyre = "0.6.12" @@ -78,10 +83,10 @@ hickory-resolver = "0.25.2" hmac = "0.12.1" hyper = "1.7.0" hyper-rustls = { version = "0.27.7", default-features = false, features = [ - "aws-lc-rs", - "http1", - "native-tokio", - "tls12", + "aws-lc-rs", + "http1", + "native-tokio", + "tls12", ] } hyper-util = "0.1.17" iana-time-zone = "0.1.64" @@ -92,19 +97,18 @@ itertools = "0.14.0" jemalloc_pprof = "0.8.1" json-patch = { version = "4.1.0", default-features = false } lettre = { version = "0.11.19", default-features = false, features = [ - "aws-lc-rs", - "builder", - "hostname", - "pool", - "rustls", - "rustls-native-certs", - "smtp-transport", - "tokio1", - "tokio1-rustls", + "aws-lc-rs", + "builder", + "hostname", + "pool", + "rustls", + "rustls-native-certs", + "smtp-transport", + "tokio1", + "tokio1-rustls", ] } maxminddb = "0.26.0" meilisearch-sdk = { version = "0.30.0", default-features = false } -elasticsearch = "9.1" modrinth-log = { path = "packages/modrinth-log" } modrinth-util = { path = "packages/modrinth-util" } muralpay = { path = "packages/muralpay" } @@ -121,29 +125,32 @@ png = "0.18.0" prometheus = "0.14.0" quartz_nbt = "0.2.9" quick-xml = "0.38.3" -rand = "=0.8.5" # Locked on 0.8 until argon2 and p256 update to 0.9 -rand_chacha = "=0.3.1" # Locked on 0.3 until we can update rand to 0.9 +rand = "=0.8.5" # Locked on 0.8 until argon2 and p256 update to 0.9 +rand_chacha = "=0.3.1" # Locked on 0.3 until we can update rand to 0.9 redis = "0.32.7" regex = "1.12.2" reqwest = { version = "0.12.24", default-features = false } rgb = "0.8.52" -rust_decimal = { version = "1.39.0", features = ["serde-with-float", "serde-with-str"] } +rust_decimal = { version = "1.39.0", features = [ + "serde-with-float", + "serde-with-str" +] } rust_iso3166 = "0.1.14" rust-s3 = { version = "0.37.0", default-features = false, features = [ - "fail-on-err", - "tags", - "tokio-rustls-tls", + "fail-on-err", + "tags", + "tokio-rustls-tls", ] } rustls = "0.23.32" rusty-money = "0.4.1" secrecy = "0.10.3" sentry = { version = "0.45.0", default-features = false, features = [ - "backtrace", - "contexts", - "debug-images", - "panic", - "reqwest", - "rustls", + "backtrace", + "contexts", + "debug-images", + "panic", + "reqwest", + "rustls", ] } serde = "1.0.228" serde_bytes = "0.11.19" @@ -151,7 +158,7 @@ serde_cbor = "0.11.2" serde_ini = "0.2.0" serde_json = "1.0.145" serde_with = "3.15.0" -serde-xml-rs = "0.8.1" # Also an XML (de)serializer, consider dropping yaserde in favor of this +serde-xml-rs = "0.8.1" # Also an XML (de)serializer, consider dropping yaserde in favor of this sha1 = "0.10.6" sha1_smol = { version = "1.0.1", features = ["std"] } sha2 = "0.10.9" @@ -171,8 +178,8 @@ tauri-plugin-opener = "2.5.0" tauri-plugin-os = "2.3.1" tauri-plugin-single-instance = "2.3.4" tauri-plugin-updater = { version = "2.9.0", default-features = false, features = [ - "rustls-tls", - "zip", + "rustls-tls", + "zip", ] } tauri-plugin-window-state = "2.4.0" tempfile = "3.23.0" @@ -198,19 +205,19 @@ utoipa-swagger-ui = { version = "9.0.2", features = ["actix-web", "vendored"] } uuid = "1.18.1" validator = "0.20.0" webp = { version = "0.3.1", default-features = false } -webview2-com = "0.38.0" # Should be updated in lockstep with wry +webview2-com = "0.38.0" # Should be updated in lockstep with wry whoami = "1.6.1" -windows = "=0.61.3" # Locked on 0.61 until we can update windows-core to 0.62 -windows-core = "=0.61.2" # Locked on 0.61 until webview2-com updates to 0.62 +windows = "=0.61.3" # Locked on 0.61 until we can update windows-core to 0.62 +windows-core = "=0.61.2" # Locked on 0.61 until webview2-com updates to 0.62 winreg = "0.55.0" woothee = "0.13.0" yaserde = "0.12.0" zbus = "5.11.0" zip = { version = "6.0.0", default-features = false, features = [ - "bzip2", - "deflate", - "deflate64", - "zstd", + "bzip2", + "deflate", + "deflate64", + "zstd", ] } zxcvbn = "3.1.0" @@ -255,14 +262,14 @@ opt-level = 3 # Optimize for speed and reduce size on release builds [profile.release] -opt-level = "s" # Optimize for binary size -strip = true # Remove debug symbols -lto = true # Enables link to optimizations -panic = "abort" # Strip expensive panic clean-up logic -codegen-units = 1 # Compile crates one after another so the compiler can optimize better +opt-level = "s" # Optimize for binary size +strip = true # Remove debug symbols +lto = true # Enables link to optimizations +panic = "abort" # Strip expensive panic clean-up logic +codegen-units = 1 # Compile crates one after another so the compiler can optimize better # Specific profile for labrinth production builds [profile.release-labrinth] inherits = "release" -strip = false # Keep debug symbols for Sentry -panic = "unwind" # Don't exit the whole app on panic in production +strip = false # Keep debug symbols for Sentry +panic = "unwind" # Don't exit the whole app on panic in production diff --git a/apps/labrinth/src/routes/v2/projects.rs b/apps/labrinth/src/routes/v2/projects.rs index 1390b4c63a..fbbf01f4f1 100644 --- a/apps/labrinth/src/routes/v2/projects.rs +++ b/apps/labrinth/src/routes/v2/projects.rs @@ -14,7 +14,7 @@ use crate::queue::moderation::AutomatedModerationQueue; use crate::queue::session::AuthQueue; use crate::routes::v3::projects::ProjectIds; use crate::routes::{ApiError, v2_reroute, v3}; -use crate::search::{SearchConfig, SearchError, search_for_project}; +use crate::search::{SearchConfig, search_for_project}; use actix_web::{HttpRequest, HttpResponse, delete, get, patch, post, web}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -54,7 +54,7 @@ pub fn config(cfg: &mut web::ServiceConfig) { pub async fn project_search( web::Query(info): web::Query, config: web::Data, -) -> Result { +) -> Result { // Search now uses loader_fields instead of explicit 'client_side' and 'server_side' fields // While the backend for this has changed, it doesnt affect much // in the API calls except that 'versions:x' is now 'game_versions:x' diff --git a/apps/labrinth/src/routes/v3/projects.rs b/apps/labrinth/src/routes/v3/projects.rs index f7f4da3f25..559453d9f2 100644 --- a/apps/labrinth/src/routes/v3/projects.rs +++ b/apps/labrinth/src/routes/v3/projects.rs @@ -30,8 +30,7 @@ use crate::queue::session::AuthQueue; use crate::routes::ApiError; use crate::routes::internal::delphi; use crate::search::indexing::remove_documents; -use crate::search::{SearchConfig, SearchError, search_for_project}; -use crate::util::error::Context; +use crate::search::{SearchConfig, SearchResults, search_for_project}; use crate::util::img; use crate::util::img::{delete_old_images, upload_image_optimized}; use crate::util::routes::read_limited_from_payload; @@ -1040,7 +1039,7 @@ pub async fn edit_project_categories( pub async fn project_search( web::Query(info): web::Query, config: web::Data, -) -> Result { +) -> Result, ApiError> { let results = search_for_project(&info, &config).await?; // TODO: add this back @@ -1055,7 +1054,7 @@ pub async fn project_search( // total_hits: results.total_hits, // }; - Ok(HttpResponse::Ok().json(results)) + Ok(web::Json(results)) } //checks the validity of a project id or slug diff --git a/apps/labrinth/src/search/backend/mod.rs b/apps/labrinth/src/search/backend/mod.rs index 64a45fb2cb..db1314d2f3 100644 --- a/apps/labrinth/src/search/backend/mod.rs +++ b/apps/labrinth/src/search/backend/mod.rs @@ -1,9 +1,10 @@ use crate::models::projects::SearchRequest; -use crate::{models::error::ApiError, search::SearchResults}; +use crate::search::SearchResults; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum BackendType { +pub enum BackendKind { Meilisearch, Elasticsearch, } @@ -91,3 +92,46 @@ pub enum IndexingError { #[error("Environment Error")] Env(#[from] dotenvy::Error), } + +pub fn get_backend() -> Result< + Box, + SearchError, +> { + use crate::search::backend::SearchBackend; + use crate::search::backend::{ + BackendKind, ElasticsearchBackend, MeilisearchBackend, + }; + + let backend_type_str = dotenvy::var("SEARCH_BACKEND") + .unwrap_or_else(|_| "meilisearch".to_string()); + + let backend_type = match backend_type_str.as_str() { + "elasticsearch" => BackendKind::Elasticsearch, + _ => BackendKind::Meilisearch, + }; + + match backend_type { + BackendKind::Elasticsearch => { + let url = dotenvy::var("ELASTICSEARCH_URL") + .unwrap_or_else(|_| "http://localhost:9200".to_string()); + let index_prefix = dotenvy::var("ELASTICSEARCH_INDEX_PREFIX") + .unwrap_or_else(|_| "labrinth".to_string()); + + info!( + "Using Elasticsearch backend at {} with prefix {}", + url, index_prefix + ); + let backend = ElasticsearchBackend::new(&url, &index_prefix) + .map_err(|e| SearchError::Elasticsearch(e))?; + Ok(Box::new(backend)) + } + BackendKind::Meilisearch => { + let meta_namespace = + dotenvy::var("MEILISEARCH_META_NAMESPACE").ok(); + info!("Using Meilisearch backend"); + let config = SearchConfig::new(meta_namespace); + let backend = MeilisearchBackend::new(config); + Ok(Box::new(backend)) + } + } +} diff --git a/apps/labrinth/src/search/mod.rs b/apps/labrinth/src/search/mod.rs index 0a4233abb8..34377f5b96 100644 --- a/apps/labrinth/src/search/mod.rs +++ b/apps/labrinth/src/search/mod.rs @@ -1,9 +1,10 @@ use crate::env::ENV; use crate::models::projects::SearchRequest; -use crate::{models::error::ApiError, search::indexing::IndexingError}; -use actix_web::HttpResponse; -use actix_web::http::StatusCode; +use crate::routes::ApiError; +use crate::search::indexing::IndexingError; +use crate::util::error::Context; use chrono::{DateTime, Utc}; +use eyre::eyre; use futures::TryStreamExt; use futures::stream::FuturesOrdered; use itertools::Itertools; @@ -13,66 +14,11 @@ use serde_json::Value; use std::borrow::Cow; use std::collections::HashMap; use std::fmt::Write; -use thiserror::Error; use tracing::{Instrument, info_span}; -pub mod backend; +// pub mod backend; pub mod indexing; -#[derive(Error, Debug)] -pub enum SearchError { - #[error("MeiliSearch Error: {0}")] - MeiliSearch(#[from] meilisearch_sdk::errors::Error), - #[error("Elasticsearch Error: {0}")] - Elasticsearch(String), - #[error("Error while serializing or deserializing JSON: {0}")] - Serde(#[from] serde_json::Error), - #[error("Error while parsing an integer: {0}")] - IntParsing(#[from] std::num::ParseIntError), - #[error("Error while formatting strings: {0}")] - FormatError(#[from] std::fmt::Error), - #[error("Environment Error")] - Env(#[from] dotenvy::Error), - #[error("Invalid index to sort by: {0}")] - InvalidIndex(String), - #[error("Unknown backend type: {0}")] - UnknownBackend(String), -} - -impl actix_web::ResponseError for SearchError { - fn status_code(&self) -> StatusCode { - match self { - SearchError::Env(..) => StatusCode::INTERNAL_SERVER_ERROR, - SearchError::MeiliSearch(..) => StatusCode::BAD_REQUEST, - SearchError::Elasticsearch(..) => StatusCode::BAD_REQUEST, - SearchError::Serde(..) => StatusCode::BAD_REQUEST, - SearchError::IntParsing(..) => StatusCode::BAD_REQUEST, - SearchError::InvalidIndex(..) => StatusCode::BAD_REQUEST, - SearchError::FormatError(..) => StatusCode::BAD_REQUEST, - SearchError::UnknownBackend(..) => { - StatusCode::INTERNAL_SERVER_ERROR - } - } - } - - fn error_response(&self) -> HttpResponse { - HttpResponse::build(self.status_code()).json(ApiError { - error: match self { - SearchError::Env(..) => "environment_error", - SearchError::MeiliSearch(..) => "meilisearch_error", - SearchError::Elasticsearch(..) => "elasticsearch_error", - SearchError::Serde(..) => "invalid_input", - SearchError::IntParsing(..) => "invalid_input", - SearchError::InvalidIndex(..) => "invalid_input", - SearchError::FormatError(..) => "invalid_input", - SearchError::UnknownBackend(..) => "internal_error", - }, - description: self.to_string(), - details: None, - }) - } -} - #[derive(Debug, Clone)] pub struct MeilisearchReadClient { pub client: Client, @@ -263,79 +209,47 @@ pub struct ResultSearchProject { pub fn get_sort_index( config: &SearchConfig, index: &str, -) -> Result<(String, [&'static str; 1]), SearchError> { +) -> Result<(String, &'static [&'static str]), ApiError> { let projects_name = config.get_index_name("projects", false); let projects_filtered_name = config.get_index_name("projects_filtered", false); Ok(match index { - "relevance" => (projects_name, ["downloads:desc"]), - "downloads" => (projects_filtered_name, ["downloads:desc"]), - "follows" => (projects_name, ["follows:desc"]), - "updated" => (projects_name, ["date_modified:desc"]), - "newest" => (projects_name, ["date_created:desc"]), - i => return Err(SearchError::InvalidIndex(i.to_string())), + "relevance" => (projects_name, &["downloads:desc"]), + "downloads" => (projects_filtered_name, &["downloads:desc"]), + "follows" => (projects_name, &["follows:desc"]), + "updated" => (projects_name, &["date_modified:desc"]), + "newest" => (projects_name, &["date_created:desc"]), + _ => return Err(ApiError::Request(eyre!("invalid index `{index}`"))), }) } -pub fn get_backend() -> Result< - Box, - SearchError, -> { - use crate::search::backend::SearchBackend; - use crate::search::backend::{ - BackendType, ElasticsearchBackend, MeilisearchBackend, - }; - - let backend_type_str = dotenvy::var("SEARCH_BACKEND") - .unwrap_or_else(|_| "meilisearch".to_string()); - - let backend_type = match backend_type_str.as_str() { - "elasticsearch" => BackendType::Elasticsearch, - _ => BackendType::Meilisearch, - }; - - match backend_type { - BackendType::Elasticsearch => { - let url = dotenvy::var("ELASTICSEARCH_URL") - .unwrap_or_else(|_| "http://localhost:9200".to_string()); - let index_prefix = dotenvy::var("ELASTICSEARCH_INDEX_PREFIX") - .unwrap_or_else(|_| "labrinth".to_string()); - - info!( - "Using Elasticsearch backend at {} with prefix {}", - url, index_prefix - ); - let backend = ElasticsearchBackend::new(&url, &index_prefix) - .map_err(|e| SearchError::Elasticsearch(e))?; - Ok(Box::new(backend)) - } - BackendType::Meilisearch => { - let meta_namespace = - dotenvy::var("MEILISEARCH_META_NAMESPACE").ok(); - info!("Using Meilisearch backend"); - let config = SearchConfig::new(meta_namespace); - let backend = MeilisearchBackend::new(config); - Ok(Box::new(backend)) - } - } -} - pub async fn search_for_project( info: &SearchRequest, config: &SearchConfig, -) -> Result { - let offset: usize = info.offset.as_deref().unwrap_or("0").parse()?; +) -> Result { + let offset: usize = info + .offset + .as_deref() + .unwrap_or("0") + .parse() + .wrap_request_err("invalid offset")?; let index = info.index.as_deref().unwrap_or("relevance"); let limit = info .limit .as_deref() .unwrap_or("10") - .parse::()? + .parse::() + .wrap_request_err("invalid limit")? .min(100); - let sort = get_sort_index(config, index)?; - let client = config.make_loadbalanced_read_client()?; - let meilisearch_index = client.get_index(sort.0).await?; + let (index_name, sort_name) = get_sort_index(config, index)?; + let client = config + .make_loadbalanced_read_client() + .wrap_internal_err("failed to make load-balanced read client")?; + let meilisearch_index = client + .get_index(index_name) + .await + .wrap_internal_err("failed to get index")?; let mut filter_string = String::new(); @@ -350,13 +264,15 @@ pub async fn search_for_project( .with_page(page) .with_hits_per_page(hits_per_page) .with_query(info.query.as_deref().unwrap_or_default()) - .with_sort(&sort.1); + .with_sort(sort_name); if let Some(new_filters) = info.new_filters.as_deref() { query.with_filter(new_filters); } else { let facets = if let Some(facets) = &info.facets { - Some(serde_json::from_str::>>(facets)?) + let facets = serde_json::from_str::>>(facets) + .wrap_request_err("failed to parse facets")?; + Some(facets) } else { None }; @@ -426,7 +342,8 @@ pub async fn search_for_project( filter_string.push(')'); if !filters.is_empty() { - write!(filter_string, " AND ({filters})")?; + write!(filter_string, " AND ({filters})") + .expect("write should not fail"); } } else { filter_string.push_str(&filters); From da44fe92e8f7db16853f3322692260dcacbb650e Mon Sep 17 00:00:00 2001 From: aecsocket Date: Fri, 13 Feb 2026 18:28:38 +0000 Subject: [PATCH 04/30] start factoring meili out to trait --- ...f4eeff66ab4165a9f4980032e114db4dc1286.json | 26 -- ...d2402f52fea71e27b08e7926fcc2a9e62c0f3.json | 20 -- ...afedb074492b4ec7f2457c14113f5fd13aa02.json | 18 - ...e5c93783c7641b019fdb698a1ec0be1393606.json | 17 - .../src/search/backend/elasticsearch.rs | 174 --------- .../src/search/backend/meilisearch.rs | 340 +++++++++++++++--- apps/labrinth/src/search/backend/mod.rs | 142 +------- apps/labrinth/src/search/indexing/mod.rs | 5 +- apps/labrinth/src/search/mod.rs | 284 +-------------- 9 files changed, 313 insertions(+), 713 deletions(-) delete mode 100644 apps/labrinth/.sqlx/query-1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286.json delete mode 100644 apps/labrinth/.sqlx/query-b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3.json delete mode 100644 apps/labrinth/.sqlx/query-cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02.json delete mode 100644 apps/labrinth/.sqlx/query-cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606.json delete mode 100644 apps/labrinth/src/search/backend/elasticsearch.rs diff --git a/apps/labrinth/.sqlx/query-1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286.json b/apps/labrinth/.sqlx/query-1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286.json deleted file mode 100644 index 921f7f92d9..0000000000 --- a/apps/labrinth/.sqlx/query-1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n id,\n status AS \"status: PayoutStatus\"\n FROM payouts\n ORDER BY id\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "status: PayoutStatus", - "type_info": "Varchar" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false - ] - }, - "hash": "1adbd24d815107e13bc1440c7a8f4eeff66ab4165a9f4980032e114db4dc1286" -} diff --git a/apps/labrinth/.sqlx/query-b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3.json b/apps/labrinth/.sqlx/query-b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3.json deleted file mode 100644 index 89bd8147dc..0000000000 --- a/apps/labrinth/.sqlx/query-b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT status AS \"status: PayoutStatus\" FROM payouts WHERE id = 1", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "status: PayoutStatus", - "type_info": "Varchar" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false - ] - }, - "hash": "b92b5bb7d179c4fcdbc45600ccfd2402f52fea71e27b08e7926fcc2a9e62c0f3" -} diff --git a/apps/labrinth/.sqlx/query-cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02.json b/apps/labrinth/.sqlx/query-cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02.json deleted file mode 100644 index 469c30168a..0000000000 --- a/apps/labrinth/.sqlx/query-cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO payouts (id, method, platform_id, status, user_id, amount, created)\n VALUES ($1, $2, $3, $4, $5, 10.0, NOW())\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Text", - "Text", - "Varchar", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "cd5ccd618fb3cc41646a6de86f9afedb074492b4ec7f2457c14113f5fd13aa02" -} diff --git a/apps/labrinth/.sqlx/query-cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606.json b/apps/labrinth/.sqlx/query-cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606.json deleted file mode 100644 index 52e020ebf2..0000000000 --- a/apps/labrinth/.sqlx/query-cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO payouts (id, method, platform_id, status, user_id, amount, created)\n VALUES ($1, $2, NULL, $3, $4, 10.00, NOW())\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Text", - "Varchar", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "cec4240c7c848988b3dfd13e3f8e5c93783c7641b019fdb698a1ec0be1393606" -} diff --git a/apps/labrinth/src/search/backend/elasticsearch.rs b/apps/labrinth/src/search/backend/elasticsearch.rs deleted file mode 100644 index 17e5c842e4..0000000000 --- a/apps/labrinth/src/search/backend/elasticsearch.rs +++ /dev/null @@ -1,174 +0,0 @@ -use crate::database::redis::RedisPool; -use crate::models::ids::VersionId; -use crate::search::backend::{ - BackendType, IndexingError, SearchBackend, TaskInfo, TaskStatus, - TasksCancelFilter, -}; -use crate::search::{ - ResultSearchProject, SearchRequest, SearchResults, UploadSearchProject, -}; -use ariadne::ids::to_base62; -use async_trait::async_trait; -use elasticsearch::http::transport::{ - SingleNodeConnectionPool, TransportBuilder, -}; -use elasticsearch::{Elasticsearch, SearchParts}; -use serde_json::json; -use tracing::info; - -pub struct ElasticsearchBackend { - pub client: Elasticsearch, - pub backend_type: BackendType, - pub index_prefix: String, -} - -impl ElasticsearchBackend { - pub fn new(url: &str, index_prefix: &str) -> Result { - let url_p = url.parse::().map_err(|e| e.to_string())?; - let conn_pool = SingleNodeConnectionPool::new(url_p); - let transport = TransportBuilder::new(conn_pool) - .build() - .map_err(|e| e.to_string())?; - let client = Elasticsearch::new(transport); - - Ok(Self { - client, - backend_type: BackendType::Elasticsearch, - index_prefix: index_prefix.to_string(), - }) - } - - fn get_projects_index_name(&self, suffix: Option<&str>) -> String { - if let Some(s) = suffix { - format!("{}_projects_{}", self.index_prefix, s) - } else { - format!("{}_projects", self.index_prefix) - } - } -} - -#[async_trait] -impl SearchBackend for ElasticsearchBackend { - async fn search( - &self, - request: &SearchRequest, - ) -> Result { - let offset: usize = request.offset.as_deref().unwrap_or("0").parse()?; - let index = request.index.as_deref().unwrap_or("relevance"); - let limit = request - .limit - .as_deref() - .unwrap_or("10") - .parse::()? - .min(100); - - let index_name = match index { - "relevance" => self.get_projects_index_name(Some("relevance")), - "downloads" => self.get_projects_index_name(Some("filtered")), - "follows" => self.get_projects_index_name(Some("relevance")), - "updated" => self.get_projects_index_name(Some("relevance")), - "newest" => self.get_projects_index_name(Some("relevance")), - i => return Err(SearchError::InvalidIndex(i.to_string())), - }; - - let sort_field = match index { - "downloads" => "downloads", - "follows" => "follows", - "updated" => "date_modified", - "newest" => "date_created", - _ => "_score", - }; - - let mut query_body = json!({ - "from": offset, - "size": limit, - "sort": [format!("{}:desc", sort_field)], - }); - - if let Some(q) = request.query.as_deref() { - if !q.is_empty() { - query_body["query"] = json!({ - "multi_match": { - "query": q, - "fields": ["name^3", "summary^2", "author"] - } - }); - } - } - - if let Some(f) = request.filters.as_deref() { - query_body["query"] = json!({ - "query_string": { - "query": f - } - }); - } - - info!("Executing Elasticsearch search on index: {}", index_name); - let response = self - .client - .search(SearchParts::Index(&[&index_name])) - .body(query_body) - .send() - .await - .map_err(|e| SearchError::Elasticsearch(e.to_string()))?; - - let response_body: serde_json::Value = - response.json().await.map_err(|e| SearchError::Serde(e))?; - - let hits = - response_body["hits"]["hits"].as_array().ok_or_else(|| { - SearchError::Serde("No hits array in response".into()) - })?; - - let total_hits = response_body["hits"]["total"] - .as_object() - .and_then(|o| o.get("value")) - .and_then(|v| v.as_i64()) - .unwrap_or(0) as usize; - - let results: Vec = hits - .iter() - .filter_map(|hit| hit["_source"].as_object().cloned()) - .map(|source| serde_json::from_value(source.into())) - .collect::, _>>() - .map_err(|e| SearchError::Serde(e))?; - - Ok(SearchResults { - hits: results, - page: (offset / limit) + 1, - hits_per_page: limit, - total_hits, - }) - } - - async fn index_projects( - &self, - ro_pool: sqlx::PgPool, - _redis: RedisPool, - ) -> Result<(), IndexingError> { - info!("Elasticsearch indexing not yet implemented"); - Ok(()) - } - - async fn remove_documents( - &self, - ids: &[VersionId], - ) -> Result<(), IndexingError> { - info!("Removing {} documents from Elasticsearch", ids.len()); - Ok(()) - } - - async fn get_tasks(&self) -> Result, IndexingError> { - info!("Getting Elasticsearch tasks not yet implemented"); - Ok(vec![]) - } - - async fn cancel_tasks( - &self, - _filter: TasksCancelFilter, - ) -> Result<(), IndexingError> { - info!("Cancelling Elasticsearch tasks not yet implemented"); - Ok(()) - } -} diff --git a/apps/labrinth/src/search/backend/meilisearch.rs b/apps/labrinth/src/search/backend/meilisearch.rs index 1ba1602067..e46c3d0877 100644 --- a/apps/labrinth/src/search/backend/meilisearch.rs +++ b/apps/labrinth/src/search/backend/meilisearch.rs @@ -1,74 +1,316 @@ -use crate::database::redis::RedisPool; -use crate::models::ids::VersionId; use crate::models::projects::SearchRequest; +use crate::routes::ApiError; use crate::search::backend::{ - BackendType, IndexingError, SearchBackend, TaskInfo, TaskStatus, - TasksCancelFilter, -}; -use crate::search::indexing::cancel_tasks as meilisearch_cancel_tasks; -use crate::search::indexing::get_tasks as meilisearch_get_tasks; -use crate::search::indexing::index_projects as meilisearch_index_projects; -use crate::search::indexing::remove_documents as meilisearch_remove_documents; -use crate::search::search_for_project as meilisearch_search; -use crate::search::{ - SearchConfig, SearchError, SearchResults, UploadSearchProject, + ResultSearchProject, SearchBackend, SearchResults, }; +use crate::util::error::Context; use async_trait::async_trait; +use eyre::eyre; +use futures::TryStreamExt; +use futures::stream::FuturesOrdered; +use itertools::Itertools; +use meilisearch_sdk::client::Client; +use serde_json::Value; +use std::borrow::Cow; +use std::fmt::Write; +use tracing::{Instrument, info_span}; -pub struct MeilisearchBackend { - pub config: SearchConfig, - pub backend_type: BackendType, +#[derive(Debug, Clone)] +pub struct MeilisearchReadClient { + pub client: Client, } -impl MeilisearchBackend { - pub fn new(config: SearchConfig) -> Self { - Self { - config, - backend_type: BackendType::Meilisearch, +impl std::ops::Deref for MeilisearchReadClient { + type Target = Client; + + fn deref(&self) -> &Self::Target { + &self.client + } +} + +pub struct BatchClient { + pub clients: Vec, +} + +impl BatchClient { + pub fn new(clients: Vec) -> Self { + Self { clients } + } + + pub async fn with_all_clients<'a, T, G, Fut>( + &'a self, + task_name: &str, + generator: G, + ) -> Result, crate::search::indexing::IndexingError> + where + G: Fn(&'a Client) -> Fut, + Fut: Future> + + 'a, + { + let mut tasks = FuturesOrdered::new(); + for (idx, client) in self.clients.iter().enumerate() { + tasks.push_back(generator(client).instrument(info_span!( + "client_task", + task.name = task_name, + client.idx = idx, + ))); } + + let results = tasks.try_collect::>().await?; + Ok(results) + } + + pub fn across_all(&self, data: Vec, mut predicate: F) -> Vec + where + F: FnMut(T, &Client) -> R, + { + assert_eq!( + data.len(), + self.clients.len(), + "mismatch between data len and meilisearch client count" + ); + self.clients + .iter() + .zip(data) + .map(|(client, item)| predicate(item, client)) + .collect() } } -#[async_trait] -impl SearchBackend for MeilisearchBackend { - async fn search( - &self, - request: &SearchRequest, - ) -> Result { - meilisearch_search(request, &self.config).await +#[derive(Debug, Clone)] +pub struct SearchConfig { + pub addresses: Vec, + pub read_lb_address: String, + pub key: String, + pub meta_namespace: String, +} + +impl SearchConfig { + pub fn new(meta_namespace: Option) -> Self { + let address_many = dotenvy::var("MEILISEARCH_WRITE_ADDRS") + .expect("MEILISEARCH_WRITE_ADDRS not set"); + + let read_lb_address = dotenvy::var("MEILISEARCH_READ_ADDR") + .expect("MEILISEARCH_READ_ADDR not set"); + + let addresses = address_many + .split(',') + .filter(|s| !s.trim().is_empty()) + .map(|s| s.to_string()) + .collect::>(); + + let key = + dotenvy::var("MEILISEARCH_KEY").expect("MEILISEARCH_KEY not set"); + + Self { + addresses, + key, + meta_namespace: meta_namespace.unwrap_or_default(), + read_lb_address, + } } - async fn index_projects( + pub fn make_loadbalanced_read_client( &self, - ro_pool: sqlx::PgPool, - redis: RedisPool, - ) -> Result<(), IndexingError> { - meilisearch_index_projects(ro_pool, redis, &self.config) - .await - .map_err(|e| IndexingError::Meilisearch(e.to_string())) + ) -> Result { + Ok(MeilisearchReadClient { + client: Client::new(&self.read_lb_address, Some(&self.key))?, + }) } - async fn remove_documents( + pub fn make_batch_client( &self, - ids: &[VersionId], - ) -> Result<(), IndexingError> { - meilisearch_remove_documents(ids, &self.config) - .await - .map_err(|e| IndexingError::Meilisearch(e.to_string())) + ) -> Result { + Ok(BatchClient::new( + self.addresses + .iter() + .map(|address| { + Client::new(address.as_str(), Some(self.key.as_str())) + }) + .collect::, _>>()?, + )) } - async fn get_tasks(&self) -> Result, IndexingError> { - meilisearch_get_tasks(&self.config) - .await - .map_err(|e| IndexingError::Meilisearch(e.to_string())) + pub fn get_index_name(&self, index: &str, next: bool) -> String { + let alt = if next { "_alt" } else { "" }; + format!("{}_{}_{}", self.meta_namespace, index, alt) } +} - async fn cancel_tasks( +pub struct MeilisearchBackend { + pub config: SearchConfig, +} + +impl MeilisearchBackend { + pub fn new(config: SearchConfig) -> Self { + Self { config } + } + + fn get_sort_index( &self, - filter: TasksCancelFilter, - ) -> Result<(), IndexingError> { - meilisearch_cancel_tasks(&self.config, filter) + index: &str, + ) -> Result<(String, &'static [&'static str]), ApiError> { + let projects_name = self.config.get_index_name("projects", false); + let projects_filtered_name = + self.config.get_index_name("projects_filtered", false); + Ok(match index { + "relevance" => (projects_name, &["downloads:desc"]), + "downloads" => (projects_filtered_name, &["downloads:desc"]), + "follows" => (projects_name, &["follows:desc"]), + "updated" => (projects_name, &["date_modified:desc"]), + "newest" => (projects_name, &["date_created:desc"]), + _ => { + return Err(ApiError::Request(eyre!( + "invalid index `{index}`" + ))); + } + }) + } +} + +#[async_trait] +impl SearchBackend for MeilisearchBackend { + async fn search_for_project( + &self, + info: &SearchRequest, + ) -> Result { + let offset: usize = info + .offset + .as_deref() + .unwrap_or("0") + .parse() + .wrap_request_err("invalid offset")?; + let index = info.index.as_deref().unwrap_or("relevance"); + let limit = info + .limit + .as_deref() + .unwrap_or("10") + .parse::() + .wrap_request_err("invalid limit")? + .min(100); + + let (index_name, sort_name) = self.get_sort_index(index)?; + let client = self + .config + .make_loadbalanced_read_client() + .wrap_internal_err("failed to make load-balanced read client")?; + let meilisearch_index = client + .get_index(index_name) .await - .map_err(|e| IndexingError::Meilisearch(e.to_string())) + .wrap_internal_err("failed to get index")?; + + let mut filter_string = String::new(); + + let hits_per_page = if limit == 0 { 1 } else { limit }; + + let page = offset / hits_per_page + 1; + + let results = { + let mut query = meilisearch_index.search(); + query + .with_page(page) + .with_hits_per_page(hits_per_page) + .with_query(info.query.as_deref().unwrap_or_default()) + .with_sort(sort_name); + + if let Some(new_filters) = info.new_filters.as_deref() { + query.with_filter(new_filters); + } else { + let facets = if let Some(facets) = &info.facets { + let facets = + serde_json::from_str::>>(facets) + .wrap_request_err("failed to parse facets")?; + Some(facets) + } else { + None + }; + + let filters: Cow<_> = + match (info.filters.as_deref(), info.version.as_deref()) { + (Some(f), Some(v)) => format!("({f}) AND ({v})").into(), + (Some(f), None) => f.into(), + (None, Some(v)) => v.into(), + (None, None) => "".into(), + }; + + if let Some(facets) = facets { + let facets: Vec>> = + facets + .into_iter() + .map(|facets| { + facets + .into_iter() + .map(|facet| { + if facet.is_array() { + serde_json::from_value::>(facet) + .unwrap_or_default() + } else { + vec![ + serde_json::from_value::(facet) + .unwrap_or_default(), + ] + } + }) + .collect_vec() + }) + .collect_vec(); + + filter_string.push('('); + for (index, facet_outer_list) in facets.iter().enumerate() { + filter_string.push('('); + + for (facet_outer_index, facet_inner_list) in + facet_outer_list.iter().enumerate() + { + filter_string.push('('); + for (facet_inner_index, facet) in + facet_inner_list.iter().enumerate() + { + filter_string + .push_str(&facet.replace(':', " = ")); + if facet_inner_index + != (facet_inner_list.len() - 1) + { + filter_string.push_str(" AND ") + } + } + filter_string.push(')'); + + if facet_outer_index != (facet_outer_list.len() - 1) + { + filter_string.push_str(" OR ") + } + } + + filter_string.push(')'); + + if index != (facets.len() - 1) { + filter_string.push_str(" AND ") + } + } + filter_string.push(')'); + + if !filters.is_empty() { + write!(filter_string, " AND ({filters})") + .expect("write should not fail"); + } + } else { + filter_string.push_str(&filters); + } + + if !filter_string.is_empty() { + query.with_filter(&filter_string); + } + } + + query.execute::().await? + }; + + Ok(SearchResults { + hits: results.hits.into_iter().map(|r| r.result).collect(), + page: results.page.unwrap_or_default(), + hits_per_page: results.hits_per_page.unwrap_or_default(), + total_hits: results.total_hits.unwrap_or_default(), + }) } } diff --git a/apps/labrinth/src/search/backend/mod.rs b/apps/labrinth/src/search/backend/mod.rs index db1314d2f3..6756aaae26 100644 --- a/apps/labrinth/src/search/backend/mod.rs +++ b/apps/labrinth/src/search/backend/mod.rs @@ -1,137 +1,9 @@ -use crate::models::projects::SearchRequest; -use crate::search::SearchResults; -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; +pub mod meilisearch; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum BackendKind { - Meilisearch, - Elasticsearch, -} +pub use self::meilisearch::{ + BatchClient, MeilisearchBackend, MeilisearchReadClient, SearchConfig, +}; -#[derive(Debug, Clone)] -pub struct TasksCancelFilter { - pub index_name: Option, - pub cancel_all: bool, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TaskInfo { - pub uid: u32, - pub status: TaskStatus, - pub duration: Option, - pub enqueued_at: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum TaskStatus { - Enqueued, - Processing, - Failed, - Succeeded, -} - -#[async_trait] -pub trait SearchBackend: Send + Sync { - async fn search( - &self, - request: &SearchRequest, - ) -> Result; - - async fn index_projects( - &self, - ro_pool: sqlx::PgPool, - redis: crate::database::redis::RedisPool, - ) -> Result<(), IndexingError>; - - async fn remove_documents( - &self, - ids: &[crate::models::ids::VersionId], - ) -> Result<(), IndexingError>; - - async fn get_tasks(&self) -> Result, IndexingError>; - - async fn cancel_tasks( - &self, - filter: TasksCancelFilter, - ) -> Result<(), IndexingError>; -} - -#[derive(Error, Debug)] -pub enum SearchError { - #[error("Meilisearch Error: {0}")] - Meilisearch(#[from] meilisearch_sdk::errors::Error), - #[error("Elasticsearch Error: {0}")] - Elasticsearch(String), - #[error("Error while serializing or deserializing JSON: {0}")] - Serde(#[from] serde_json::Error), - #[error("Error while parsing an integer: {0}")] - IntParsing(#[from] std::num::ParseIntError), - #[error("Error while formatting strings: {0}")] - FormatError(#[from] std::fmt::Error), - #[error("Environment Error")] - Env(#[from] dotenvy::Error), - #[error("Invalid index to sort by: {0}")] - InvalidIndex(String), - #[error("Unknown backend type: {0}")] - UnknownBackend(String), -} - -#[derive(Error, Debug)] -pub enum IndexingError { - #[error("Meilisearch Indexing Error: {0}")] - Meilisearch(String), - #[error("Elasticsearch Indexing Error: {0}")] - Elasticsearch(String), - #[error("Database Error: {0}")] - Database(#[from] sqlx::Error), - #[error("Error while serializing or deserializing JSON: {0}")] - Serde(#[from] serde_json::Error), - #[error("Error while awaiting index creation task")] - Task, - #[error("Environment Error")] - Env(#[from] dotenvy::Error), -} - -pub fn get_backend() -> Result< - Box, - SearchError, -> { - use crate::search::backend::SearchBackend; - use crate::search::backend::{ - BackendKind, ElasticsearchBackend, MeilisearchBackend, - }; - - let backend_type_str = dotenvy::var("SEARCH_BACKEND") - .unwrap_or_else(|_| "meilisearch".to_string()); - - let backend_type = match backend_type_str.as_str() { - "elasticsearch" => BackendKind::Elasticsearch, - _ => BackendKind::Meilisearch, - }; - - match backend_type { - BackendKind::Elasticsearch => { - let url = dotenvy::var("ELASTICSEARCH_URL") - .unwrap_or_else(|_| "http://localhost:9200".to_string()); - let index_prefix = dotenvy::var("ELASTICSEARCH_INDEX_PREFIX") - .unwrap_or_else(|_| "labrinth".to_string()); - - info!( - "Using Elasticsearch backend at {} with prefix {}", - url, index_prefix - ); - let backend = ElasticsearchBackend::new(&url, &index_prefix) - .map_err(|e| SearchError::Elasticsearch(e))?; - Ok(Box::new(backend)) - } - BackendKind::Meilisearch => { - let meta_namespace = - dotenvy::var("MEILISEARCH_META_NAMESPACE").ok(); - info!("Using Meilisearch backend"); - let config = SearchConfig::new(meta_namespace); - let backend = MeilisearchBackend::new(config); - Ok(Box::new(backend)) - } - } -} +pub use super::{ + ResultSearchProject, SearchBackend, SearchResults, UploadSearchProject, +}; diff --git a/apps/labrinth/src/search/indexing/mod.rs b/apps/labrinth/src/search/indexing/mod.rs index 02c7382913..7fa62fdf92 100644 --- a/apps/labrinth/src/search/indexing/mod.rs +++ b/apps/labrinth/src/search/indexing/mod.rs @@ -5,9 +5,8 @@ use std::time::Duration; use crate::database::PgPool; use crate::database::redis::RedisPool; -use crate::env::ENV; -use crate::search::{SearchConfig, UploadSearchProject}; -use crate::util::error::Context; +use crate::search::backend::UploadSearchProject; +use crate::search::backend::meilisearch::SearchConfig; use ariadne::ids::base62_impl::to_base62; use eyre::eyre; use futures::StreamExt; diff --git a/apps/labrinth/src/search/mod.rs b/apps/labrinth/src/search/mod.rs index 34377f5b96..3bbddac3ad 100644 --- a/apps/labrinth/src/search/mod.rs +++ b/apps/labrinth/src/search/mod.rs @@ -1,135 +1,22 @@ use crate::env::ENV; use crate::models::projects::SearchRequest; use crate::routes::ApiError; -use crate::search::indexing::IndexingError; -use crate::util::error::Context; +use async_trait::async_trait; use chrono::{DateTime, Utc}; -use eyre::eyre; -use futures::TryStreamExt; -use futures::stream::FuturesOrdered; -use itertools::Itertools; -use meilisearch_sdk::client::Client; use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::borrow::Cow; use std::collections::HashMap; -use std::fmt::Write; -use tracing::{Instrument, info_span}; -// pub mod backend; +pub mod backend; pub mod indexing; -#[derive(Debug, Clone)] -pub struct MeilisearchReadClient { - pub client: Client, -} - -impl std::ops::Deref for MeilisearchReadClient { - type Target = Client; - - fn deref(&self) -> &Self::Target { - &self.client - } -} - -pub struct BatchClient { - pub clients: Vec, -} - -impl BatchClient { - pub fn new(clients: Vec) -> Self { - Self { clients } - } - - pub async fn with_all_clients<'a, T, G, Fut>( - &'a self, - task_name: &str, - generator: G, - ) -> Result, IndexingError> - where - G: Fn(&'a Client) -> Fut, - Fut: Future> + 'a, - { - let mut tasks = FuturesOrdered::new(); - for (idx, client) in self.clients.iter().enumerate() { - tasks.push_back(generator(client).instrument(info_span!( - "client_task", - task.name = task_name, - client.idx = idx, - ))); - } - - let results = tasks.try_collect::>().await?; - Ok(results) - } - - pub fn across_all(&self, data: Vec, mut predicate: F) -> Vec - where - F: FnMut(T, &Client) -> R, - { - assert_eq!( - data.len(), - self.clients.len(), - "mismatch between data len and meilisearch client count" - ); - self.clients - .iter() - .zip(data) - .map(|(client, item)| predicate(item, client)) - .collect() - } -} - -#[derive(Debug, Clone)] -pub struct SearchConfig { - pub addresses: Vec, - pub read_lb_address: String, - pub key: String, - pub meta_namespace: String, -} - -impl SearchConfig { - // Panics if the environment variables are not set, - // but these are already checked for on startup. - pub fn new(meta_namespace: Option) -> Self { - Self { - addresses: ENV.MEILISEARCH_WRITE_ADDRS.0.clone(), - key: ENV.MEILISEARCH_KEY.clone(), - meta_namespace: meta_namespace.unwrap_or_default(), - read_lb_address: ENV.MEILISEARCH_READ_ADDR.clone(), - } - } - - pub fn make_loadbalanced_read_client( - &self, - ) -> Result { - Ok(MeilisearchReadClient { - client: Client::new(&self.read_lb_address, Some(&self.key))?, - }) - } - - pub fn make_batch_client( +#[async_trait] +pub trait SearchBackend: Send + Sync { + async fn search_for_project( &self, - ) -> Result { - Ok(BatchClient::new( - self.addresses - .iter() - .map(|address| { - Client::new(address.as_str(), Some(self.key.as_str())) - }) - .collect::, _>>()?, - )) - } - - // Next: true if we want the next index (we are preparing the next swap), false if we want the current index (searching) - pub fn get_index_name(&self, index: &str, next: bool) -> String { - let alt = if next { "_alt" } else { "" }; - format!("{}_{}_{}", self.meta_namespace, index, alt) - } + info: &SearchRequest, + ) -> Result; } -/// A project document used for uploading projects to MeiliSearch's indices. -/// This contains some extra data that is not returned by search results. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct UploadSearchProject { pub version_id: String, @@ -206,161 +93,16 @@ pub struct ResultSearchProject { pub loader_fields: HashMap>, } -pub fn get_sort_index( - config: &SearchConfig, - index: &str, -) -> Result<(String, &'static [&'static str]), ApiError> { - let projects_name = config.get_index_name("projects", false); - let projects_filtered_name = - config.get_index_name("projects_filtered", false); - Ok(match index { - "relevance" => (projects_name, &["downloads:desc"]), - "downloads" => (projects_filtered_name, &["downloads:desc"]), - "follows" => (projects_name, &["follows:desc"]), - "updated" => (projects_name, &["date_modified:desc"]), - "newest" => (projects_name, &["date_created:desc"]), - _ => return Err(ApiError::Request(eyre!("invalid index `{index}`"))), - }) -} +pub use backend::SearchConfig; +/// Backwards-compatible function for existing code. +/// TODO: Migrate all usages to use the SearchBackend trait directly. pub async fn search_for_project( info: &SearchRequest, config: &SearchConfig, ) -> Result { - let offset: usize = info - .offset - .as_deref() - .unwrap_or("0") - .parse() - .wrap_request_err("invalid offset")?; - let index = info.index.as_deref().unwrap_or("relevance"); - let limit = info - .limit - .as_deref() - .unwrap_or("10") - .parse::() - .wrap_request_err("invalid limit")? - .min(100); - - let (index_name, sort_name) = get_sort_index(config, index)?; - let client = config - .make_loadbalanced_read_client() - .wrap_internal_err("failed to make load-balanced read client")?; - let meilisearch_index = client - .get_index(index_name) - .await - .wrap_internal_err("failed to get index")?; - - let mut filter_string = String::new(); - - // Convert offset and limit to page and hits_per_page - let hits_per_page = if limit == 0 { 1 } else { limit }; - - let page = offset / hits_per_page + 1; - - let results = { - let mut query = meilisearch_index.search(); - query - .with_page(page) - .with_hits_per_page(hits_per_page) - .with_query(info.query.as_deref().unwrap_or_default()) - .with_sort(sort_name); - - if let Some(new_filters) = info.new_filters.as_deref() { - query.with_filter(new_filters); - } else { - let facets = if let Some(facets) = &info.facets { - let facets = serde_json::from_str::>>(facets) - .wrap_request_err("failed to parse facets")?; - Some(facets) - } else { - None - }; - - let filters: Cow<_> = - match (info.filters.as_deref(), info.version.as_deref()) { - (Some(f), Some(v)) => format!("({f}) AND ({v})").into(), - (Some(f), None) => f.into(), - (None, Some(v)) => v.into(), - (None, None) => "".into(), - }; - - if let Some(facets) = facets { - // Search can now *optionally* have a third inner array: So Vec(AND)>> - // For every inner facet, we will check if it can be deserialized into a Vec<&str>, and do so. - // If not, we will assume it is a single facet and wrap it in a Vec. - let facets: Vec>> = facets - .into_iter() - .map(|facets| { - facets - .into_iter() - .map(|facet| { - if facet.is_array() { - serde_json::from_value::>(facet) - .unwrap_or_default() - } else { - vec![ - serde_json::from_value::(facet) - .unwrap_or_default(), - ] - } - }) - .collect_vec() - }) - .collect_vec(); - - filter_string.push('('); - for (index, facet_outer_list) in facets.iter().enumerate() { - filter_string.push('('); - - for (facet_outer_index, facet_inner_list) in - facet_outer_list.iter().enumerate() - { - filter_string.push('('); - for (facet_inner_index, facet) in - facet_inner_list.iter().enumerate() - { - filter_string.push_str(&facet.replace(':', " = ")); - if facet_inner_index != (facet_inner_list.len() - 1) - { - filter_string.push_str(" AND ") - } - } - filter_string.push(')'); - - if facet_outer_index != (facet_outer_list.len() - 1) { - filter_string.push_str(" OR ") - } - } - - filter_string.push(')'); - - if index != (facets.len() - 1) { - filter_string.push_str(" AND ") - } - } - filter_string.push(')'); - - if !filters.is_empty() { - write!(filter_string, " AND ({filters})") - .expect("write should not fail"); - } - } else { - filter_string.push_str(&filters); - } - - if !filter_string.is_empty() { - query.with_filter(&filter_string); - } - } - - query.execute::().await? - }; + use backend::meilisearch::MeilisearchBackend; - Ok(SearchResults { - hits: results.hits.into_iter().map(|r| r.result).collect(), - page: results.page.unwrap_or_default(), - hits_per_page: results.hits_per_page.unwrap_or_default(), - total_hits: results.total_hits.unwrap_or_default(), - }) + let backend = MeilisearchBackend::new(config.clone()); + backend.search_for_project(info).await } From 1d17f617a972f75455cb4a50310707c0d8454f3e Mon Sep 17 00:00:00 2001 From: aecsocket Date: Fri, 13 Feb 2026 18:46:56 +0000 Subject: [PATCH 05/30] move meili to backend --- .../{ => backend/meilisearch}/indexing/local_import.rs | 0 .../src/search/{ => backend/meilisearch}/indexing/mod.rs | 0 .../search/backend/{meilisearch.rs => meilisearch/mod.rs} | 7 ++++--- apps/labrinth/src/search/mod.rs | 7 ++++++- 4 files changed, 10 insertions(+), 4 deletions(-) rename apps/labrinth/src/search/{ => backend/meilisearch}/indexing/local_import.rs (100%) rename apps/labrinth/src/search/{ => backend/meilisearch}/indexing/mod.rs (100%) rename apps/labrinth/src/search/backend/{meilisearch.rs => meilisearch/mod.rs} (98%) diff --git a/apps/labrinth/src/search/indexing/local_import.rs b/apps/labrinth/src/search/backend/meilisearch/indexing/local_import.rs similarity index 100% rename from apps/labrinth/src/search/indexing/local_import.rs rename to apps/labrinth/src/search/backend/meilisearch/indexing/local_import.rs diff --git a/apps/labrinth/src/search/indexing/mod.rs b/apps/labrinth/src/search/backend/meilisearch/indexing/mod.rs similarity index 100% rename from apps/labrinth/src/search/indexing/mod.rs rename to apps/labrinth/src/search/backend/meilisearch/indexing/mod.rs diff --git a/apps/labrinth/src/search/backend/meilisearch.rs b/apps/labrinth/src/search/backend/meilisearch/mod.rs similarity index 98% rename from apps/labrinth/src/search/backend/meilisearch.rs rename to apps/labrinth/src/search/backend/meilisearch/mod.rs index e46c3d0877..43994f8b14 100644 --- a/apps/labrinth/src/search/backend/meilisearch.rs +++ b/apps/labrinth/src/search/backend/meilisearch/mod.rs @@ -15,6 +15,8 @@ use std::borrow::Cow; use std::fmt::Write; use tracing::{Instrument, info_span}; +pub mod indexing; + #[derive(Debug, Clone)] pub struct MeilisearchReadClient { pub client: Client, @@ -41,11 +43,10 @@ impl BatchClient { &'a self, task_name: &str, generator: G, - ) -> Result, crate::search::indexing::IndexingError> + ) -> Result, indexing::IndexingError> where G: Fn(&'a Client) -> Fut, - Fut: Future> - + 'a, + Fut: Future> + 'a, { let mut tasks = FuturesOrdered::new(); for (idx, client) in self.clients.iter().enumerate() { diff --git a/apps/labrinth/src/search/mod.rs b/apps/labrinth/src/search/mod.rs index 3bbddac3ad..d4e9334cc5 100644 --- a/apps/labrinth/src/search/mod.rs +++ b/apps/labrinth/src/search/mod.rs @@ -7,7 +7,12 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; pub mod backend; -pub mod indexing; + +// Backwards-compatible re-export of the meilisearch indexing module +// TODO: Migrate all usages to use backend::meilisearch::indexing directly +pub mod indexing { + pub use crate::search::backend::meilisearch::indexing::*; +} #[async_trait] pub trait SearchBackend: Send + Sync { From 5e112e93357f223384e78bcab9e842f57e160064 Mon Sep 17 00:00:00 2001 From: aecsocket Date: Fri, 13 Feb 2026 19:19:59 +0000 Subject: [PATCH 06/30] update routes to use search backend trait --- apps/labrinth/src/lib.rs | 5 +++++ apps/labrinth/src/routes/v2/projects.rs | 6 +++--- apps/labrinth/src/routes/v3/projects.rs | 6 +++--- apps/labrinth/src/search/mod.rs | 6 ++++++ 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 4b6f484a95..130a5104a5 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -57,6 +57,7 @@ pub struct LabrinthConfig { pub scheduler: Arc, pub ip_salt: Pepper, pub search_config: search::SearchConfig, + pub search_backend: web::Data>, pub session_queue: web::Data, pub payouts_queue: web::Data, pub analytics_queue: Arc, @@ -261,6 +262,8 @@ pub fn app_setup( }); } + let search_backend = web::Data::new(search::backend(search_config.clone())); + LabrinthConfig { pool, ro_pool, @@ -270,6 +273,7 @@ pub fn app_setup( scheduler: Arc::new(scheduler), ip_salt, search_config, + search_backend, session_queue, payouts_queue: web::Data::new(PayoutsQueue::new()), analytics_queue, @@ -308,6 +312,7 @@ pub fn app_config( .app_data(web::Data::new(labrinth_config.ro_pool.clone())) .app_data(web::Data::new(labrinth_config.file_host.clone())) .app_data(web::Data::new(labrinth_config.search_config.clone())) + .app_data(labrinth_config.search_backend.clone()) .app_data(web::Data::new(labrinth_config.gotenberg_client.clone())) .app_data(labrinth_config.session_queue.clone()) .app_data(labrinth_config.payouts_queue.clone()) diff --git a/apps/labrinth/src/routes/v2/projects.rs b/apps/labrinth/src/routes/v2/projects.rs index fbbf01f4f1..ea4e4c778f 100644 --- a/apps/labrinth/src/routes/v2/projects.rs +++ b/apps/labrinth/src/routes/v2/projects.rs @@ -14,7 +14,7 @@ use crate::queue::moderation::AutomatedModerationQueue; use crate::queue::session::AuthQueue; use crate::routes::v3::projects::ProjectIds; use crate::routes::{ApiError, v2_reroute, v3}; -use crate::search::{SearchConfig, search_for_project}; +use crate::search::{SearchBackend, SearchConfig}; use actix_web::{HttpRequest, HttpResponse, delete, get, patch, post, web}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -53,7 +53,7 @@ pub fn config(cfg: &mut web::ServiceConfig) { #[get("search")] pub async fn project_search( web::Query(info): web::Query, - config: web::Data, + search_backend: web::Data>, ) -> Result { // Search now uses loader_fields instead of explicit 'client_side' and 'server_side' fields // While the backend for this has changed, it doesnt affect much @@ -99,7 +99,7 @@ pub async fn project_search( ..info }; - let results = search_for_project(&info, &config).await?; + let results = search_backend.search_for_project(&info).await?; let results = LegacySearchResults::from(results); diff --git a/apps/labrinth/src/routes/v3/projects.rs b/apps/labrinth/src/routes/v3/projects.rs index 559453d9f2..293c8c2ae4 100644 --- a/apps/labrinth/src/routes/v3/projects.rs +++ b/apps/labrinth/src/routes/v3/projects.rs @@ -30,7 +30,7 @@ use crate::queue::session::AuthQueue; use crate::routes::ApiError; use crate::routes::internal::delphi; use crate::search::indexing::remove_documents; -use crate::search::{SearchConfig, SearchResults, search_for_project}; +use crate::search::{SearchBackend, SearchConfig, SearchResults}; use crate::util::img; use crate::util::img::{delete_old_images, upload_image_optimized}; use crate::util::routes::read_limited_from_payload; @@ -1038,9 +1038,9 @@ pub async fn edit_project_categories( pub async fn project_search( web::Query(info): web::Query, - config: web::Data, + search_backend: web::Data>, ) -> Result, ApiError> { - let results = search_for_project(&info, &config).await?; + let results = search_backend.search_for_project(&info).await?; // TODO: add this back // let results = ReturnSearchResults { diff --git a/apps/labrinth/src/search/mod.rs b/apps/labrinth/src/search/mod.rs index d4e9334cc5..e557f6aae6 100644 --- a/apps/labrinth/src/search/mod.rs +++ b/apps/labrinth/src/search/mod.rs @@ -100,6 +100,12 @@ pub struct ResultSearchProject { pub use backend::SearchConfig; +/// Creates and returns a boxed SearchBackend. +/// Currently returns a MeilisearchBackend, but can be swapped for other implementations. +pub fn backend(config: SearchConfig) -> Box { + Box::new(backend::meilisearch::MeilisearchBackend::new(config)) +} + /// Backwards-compatible function for existing code. /// TODO: Migrate all usages to use the SearchBackend trait directly. pub async fn search_for_project( From 4ffafc11db222a2e0a88f02cedf20b8b227ffb1a Mon Sep 17 00:00:00 2001 From: aecsocket Date: Mon, 23 Feb 2026 12:38:06 +0000 Subject: [PATCH 07/30] wip --- apps/labrinth/src/background_task.rs | 10 ++--- apps/labrinth/src/lib.rs | 7 ++- apps/labrinth/src/main.rs | 1 - apps/labrinth/src/routes/v2/versions.rs | 6 +-- apps/labrinth/src/routes/v3/projects.rs | 44 +++++++++---------- apps/labrinth/src/routes/v3/versions.rs | 6 +-- .../src/search/backend/meilisearch/mod.rs | 22 ++++++++++ apps/labrinth/src/search/mod.rs | 33 +++++++------- 8 files changed, 76 insertions(+), 53 deletions(-) diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs index 1783e36e26..bd82ae7bfc 100644 --- a/apps/labrinth/src/background_task.rs +++ b/apps/labrinth/src/background_task.rs @@ -7,7 +7,7 @@ use crate::queue::payouts::{ insert_bank_balances_and_webhook, process_affiliate_payouts, process_payout, remove_payouts_for_refunded_charges, }; -use crate::search::indexing::index_projects; +use crate::search::SearchBackend; use crate::util::anrok; use crate::{database, search}; use clap::ValueEnum; @@ -34,7 +34,6 @@ impl BackgroundTask { pool: PgPool, ro_pool: PgPool, redis_pool: RedisPool, - search_config: search::SearchConfig, clickhouse: clickhouse::Client, stripe_client: stripe::Client, anrok_client: anrok::Client, @@ -45,7 +44,8 @@ impl BackgroundTask { match self { Migrations => run_migrations().await, IndexSearch => { - index_search(ro_pool, redis_pool, search_config).await + let search_backend = search::backend(); + index_search(ro_pool, redis_pool, search_backend).await } ReleaseScheduled => release_scheduled(pool).await, UpdateVersions => update_versions(pool, redis_pool).await, @@ -122,10 +122,10 @@ pub async fn run_migrations() { pub async fn index_search( ro_pool: PgPool, redis_pool: RedisPool, - search_config: search::SearchConfig, + search_backend: Box, ) { info!("Indexing local database"); - let result = index_projects(ro_pool, redis_pool, &search_config).await; + let result = search_backend.index_projects(ro_pool, redis_pool).await; if let Err(e) = result { warn!("Local project indexing failed: {:?}", e); } diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 130a5104a5..c0b544f773 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -114,17 +114,16 @@ pub fn app_setup( let local_index_interval = Duration::from_secs(ENV.LOCAL_INDEX_INTERVAL); let pool_ref = pool.clone(); - let search_config_ref = search_config.clone(); let redis_pool_ref = redis_pool.clone(); scheduler.run(local_index_interval, move || { let pool_ref = pool_ref.clone(); let redis_pool_ref = redis_pool_ref.clone(); - let search_config_ref = search_config_ref.clone(); async move { + let search_backend = search::backend(); background_task::index_search( pool_ref, redis_pool_ref, - search_config_ref, + search_backend, ) .await; } @@ -262,7 +261,7 @@ pub fn app_setup( }); } - let search_backend = web::Data::new(search::backend(search_config.clone())); + let search_backend = web::Data::new(search::backend()); LabrinthConfig { pool, diff --git a/apps/labrinth/src/main.rs b/apps/labrinth/src/main.rs index d18984391c..64a87759c8 100644 --- a/apps/labrinth/src/main.rs +++ b/apps/labrinth/src/main.rs @@ -171,7 +171,6 @@ async fn app() -> std::io::Result<()> { pool, ro_pool.into_inner(), redis_pool, - search_config, clickhouse, stripe_client, anrok_client.clone(), diff --git a/apps/labrinth/src/routes/v2/versions.rs b/apps/labrinth/src/routes/v2/versions.rs index 708f35b729..8c5d2d9fd0 100644 --- a/apps/labrinth/src/routes/v2/versions.rs +++ b/apps/labrinth/src/routes/v2/versions.rs @@ -11,7 +11,7 @@ use crate::models::projects::{ use crate::models::v2::projects::LegacyVersion; use crate::queue::session::AuthQueue; use crate::routes::{v2_reroute, v3}; -use crate::search::SearchConfig; +use crate::search::SearchBackend; use actix_web::{HttpRequest, HttpResponse, delete, get, patch, web}; use serde::{Deserialize, Serialize}; use validator::Validate; @@ -349,7 +349,7 @@ pub async fn version_delete( pool: web::Data, redis: web::Data, session_queue: web::Data, - search_config: web::Data, + search_backend: web::Data>, ) -> Result { // Returns NoContent, so we don't need to convert the response v3::versions::version_delete( @@ -358,7 +358,7 @@ pub async fn version_delete( pool, redis, session_queue, - search_config, + search_backend, ) .await .or_else(v2_reroute::flatten_404_error) diff --git a/apps/labrinth/src/routes/v3/projects.rs b/apps/labrinth/src/routes/v3/projects.rs index 293c8c2ae4..855cb865b1 100644 --- a/apps/labrinth/src/routes/v3/projects.rs +++ b/apps/labrinth/src/routes/v3/projects.rs @@ -263,7 +263,7 @@ pub async fn project_edit( req: HttpRequest, info: web::Path<(String,)>, pool: web::Data, - search_config: web::Data, + search_backend: web::Data>, new_project: web::Json, redis: web::Data, session_queue: web::Data, @@ -973,16 +973,16 @@ pub async fn project_edit( project_item.inner.status.is_searchable(), new_project.status.map(|status| status.is_searchable()), ) { - remove_documents( - &project_item - .versions - .into_iter() - .map(|x| x.into()) - .collect::>(), - &search_config, - ) - .await - .wrap_internal_err("failed to remove documents")?; + search_backend + .remove_documents( + &project_item + .versions + .into_iter() + .map(|x| x.into()) + .collect::>(), + ) + .await + .wrap_internal_err("failed to remove documents")?; } Ok(HttpResponse::NoContent().body("")) @@ -2157,7 +2157,7 @@ pub async fn project_delete( info: web::Path<(String,)>, pool: web::Data, redis: web::Data, - search_config: web::Data, + search_backend: web::Data>, session_queue: web::Data, ) -> Result<(), ApiError> { let (_, user) = get_user_from_headers( @@ -2269,16 +2269,16 @@ pub async fn project_delete( .await .wrap_internal_err("failed to commit transaction")?; - remove_documents( - &project - .versions - .into_iter() - .map(|x| x.into()) - .collect::>(), - &search_config, - ) - .await - .wrap_internal_err("failed to remove project version documents")?; + search_backend + .remove_documents( + &project + .versions + .into_iter() + .map(|x| x.into()) + .collect::>(), + ) + .await + .wrap_internal_err("failed to remove project version documents")?; if result.is_some() { Ok(()) diff --git a/apps/labrinth/src/routes/v3/versions.rs b/apps/labrinth/src/routes/v3/versions.rs index 5912ca4300..663112e701 100644 --- a/apps/labrinth/src/routes/v3/versions.rs +++ b/apps/labrinth/src/routes/v3/versions.rs @@ -894,7 +894,7 @@ pub async fn version_delete( pool: web::Data, redis: web::Data, session_queue: web::Data, - search_config: web::Data, + search_backend: web::Data>, ) -> Result { let user = get_user_from_headers( &req, @@ -1001,10 +1001,10 @@ pub async fn version_delete( &redis, ) .await?; - remove_documents(&[version.inner.id.into()], &search_config) + search_backend + .remove_documents(&[version.inner.id.into()]) .await .wrap_internal_err("failed to remove documents")?; - if result.is_some() { Ok(HttpResponse::NoContent().body("")) } else { diff --git a/apps/labrinth/src/search/backend/meilisearch/mod.rs b/apps/labrinth/src/search/backend/meilisearch/mod.rs index 43994f8b14..ad36073c11 100644 --- a/apps/labrinth/src/search/backend/meilisearch/mod.rs +++ b/apps/labrinth/src/search/backend/meilisearch/mod.rs @@ -1,3 +1,6 @@ +use crate::database::PgPool; +use crate::database::redis::RedisPool; +use crate::models::ids::VersionId; use crate::models::projects::SearchRequest; use crate::routes::ApiError; use crate::search::backend::{ @@ -314,4 +317,23 @@ impl SearchBackend for MeilisearchBackend { total_hits: results.total_hits.unwrap_or_default(), }) } + + async fn index_projects( + &self, + ro_pool: PgPool, + redis: RedisPool, + ) -> Result<(), ApiError> { + indexing::index_projects(ro_pool, redis, &self.config) + .await + .map_err(|e| ApiError::Internal(e.into())) + } + + async fn remove_documents( + &self, + ids: &[VersionId], + ) -> Result<(), ApiError> { + indexing::remove_documents(ids, &self.config) + .await + .map_err(|e| ApiError::Internal(e.into())) + } } diff --git a/apps/labrinth/src/search/mod.rs b/apps/labrinth/src/search/mod.rs index e557f6aae6..54a50dbca9 100644 --- a/apps/labrinth/src/search/mod.rs +++ b/apps/labrinth/src/search/mod.rs @@ -1,4 +1,7 @@ +use crate::database::PgPool; +use crate::database::redis::RedisPool; use crate::env::ENV; +use crate::models::ids::VersionId; use crate::models::projects::SearchRequest; use crate::routes::ApiError; use async_trait::async_trait; @@ -20,6 +23,15 @@ pub trait SearchBackend: Send + Sync { &self, info: &SearchRequest, ) -> Result; + + async fn index_projects( + &self, + ro_pool: PgPool, + redis: RedisPool, + ) -> Result<(), ApiError>; + + async fn remove_documents(&self, ids: &[VersionId]) + -> Result<(), ApiError>; } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -98,22 +110,13 @@ pub struct ResultSearchProject { pub loader_fields: HashMap>, } -pub use backend::SearchConfig; +/// Re-export of Meilisearch-specific SearchConfig for backwards compatibility. +/// TODO: Remove this when all usages are migrated to use SearchBackend trait. +pub use backend::meilisearch::SearchConfig; -/// Creates and returns a boxed SearchBackend. +/// Creates and returns a boxed SearchBackend with default configuration. /// Currently returns a MeilisearchBackend, but can be swapped for other implementations. -pub fn backend(config: SearchConfig) -> Box { +pub fn backend() -> Box { + let config = backend::meilisearch::SearchConfig::new(None); Box::new(backend::meilisearch::MeilisearchBackend::new(config)) } - -/// Backwards-compatible function for existing code. -/// TODO: Migrate all usages to use the SearchBackend trait directly. -pub async fn search_for_project( - info: &SearchRequest, - config: &SearchConfig, -) -> Result { - use backend::meilisearch::MeilisearchBackend; - - let backend = MeilisearchBackend::new(config.clone()); - backend.search_for_project(info).await -} From 71e3256eeaffe7821ba962b059aeacd1a706425c Mon Sep 17 00:00:00 2001 From: aecsocket Date: Mon, 23 Feb 2026 12:38:16 +0000 Subject: [PATCH 08/30] Update projects.rs --- apps/labrinth/src/routes/v2/projects.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/labrinth/src/routes/v2/projects.rs b/apps/labrinth/src/routes/v2/projects.rs index ea4e4c778f..57ff925634 100644 --- a/apps/labrinth/src/routes/v2/projects.rs +++ b/apps/labrinth/src/routes/v2/projects.rs @@ -14,7 +14,7 @@ use crate::queue::moderation::AutomatedModerationQueue; use crate::queue::session::AuthQueue; use crate::routes::v3::projects::ProjectIds; use crate::routes::{ApiError, v2_reroute, v3}; -use crate::search::{SearchBackend, SearchConfig}; +use crate::search::SearchBackend; use actix_web::{HttpRequest, HttpResponse, delete, get, patch, post, web}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -411,7 +411,7 @@ pub async fn project_edit( req: HttpRequest, info: web::Path<(String,)>, pool: web::Data, - search_config: web::Data, + search_backend: web::Data>, new_project: web::Json, redis: web::Data, session_queue: web::Data, @@ -521,7 +521,7 @@ pub async fn project_edit( req.clone(), info, pool.clone(), - search_config, + search_backend, web::Json(new_project), redis.clone(), session_queue.clone(), @@ -915,7 +915,7 @@ pub async fn project_delete( info: web::Path<(String,)>, pool: web::Data, redis: web::Data, - search_config: web::Data, + search_backend: web::Data>, session_queue: web::Data, ) -> Result { // Returns NoContent, so no need to convert @@ -924,7 +924,7 @@ pub async fn project_delete( info, pool, redis, - search_config, + search_backend, session_queue, ) .await From badb907d71fd081f6f37ae25361c64df3f956816 Mon Sep 17 00:00:00 2001 From: aecsocket Date: Sun, 15 Feb 2026 15:00:03 +0000 Subject: [PATCH 09/30] search backend is only init'd once in config --- apps/labrinth/src/background_task.rs | 7 ++++--- apps/labrinth/src/lib.rs | 6 +++--- apps/labrinth/src/main.rs | 3 +++ apps/labrinth/src/test/mod.rs | 2 ++ 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs index bd82ae7bfc..de4f88fca5 100644 --- a/apps/labrinth/src/background_task.rs +++ b/apps/labrinth/src/background_task.rs @@ -1,3 +1,4 @@ +use crate::database; use crate::database::PgPool; use crate::database::redis::RedisPool; use crate::queue::billing::{index_billing, index_subscriptions}; @@ -9,7 +10,7 @@ use crate::queue::payouts::{ }; use crate::search::SearchBackend; use crate::util::anrok; -use crate::{database, search}; +use actix_web::web; use clap::ValueEnum; use tracing::{error, info, warn}; @@ -34,6 +35,7 @@ impl BackgroundTask { pool: PgPool, ro_pool: PgPool, redis_pool: RedisPool, + search_backend: web::Data>, clickhouse: clickhouse::Client, stripe_client: stripe::Client, anrok_client: anrok::Client, @@ -44,7 +46,6 @@ impl BackgroundTask { match self { Migrations => run_migrations().await, IndexSearch => { - let search_backend = search::backend(); index_search(ro_pool, redis_pool, search_backend).await } ReleaseScheduled => release_scheduled(pool).await, @@ -122,7 +123,7 @@ pub async fn run_migrations() { pub async fn index_search( ro_pool: PgPool, redis_pool: RedisPool, - search_backend: Box, + search_backend: web::Data>, ) { info!("Indexing local database"); let result = search_backend.index_projects(ro_pool, redis_pool).await; diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index c0b544f773..2a1c970d60 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -77,6 +77,7 @@ pub fn app_setup( ro_pool: ReadOnlyPgPool, redis_pool: RedisPool, search_config: search::SearchConfig, + search_backend: actix_web::web::Data>, clickhouse: &mut Client, file_host: Arc, stripe_client: stripe::Client, @@ -115,11 +116,12 @@ pub fn app_setup( Duration::from_secs(ENV.LOCAL_INDEX_INTERVAL); let pool_ref = pool.clone(); let redis_pool_ref = redis_pool.clone(); + let search_backend_ref = search_backend.clone(); scheduler.run(local_index_interval, move || { let pool_ref = pool_ref.clone(); let redis_pool_ref = redis_pool_ref.clone(); + let search_backend = search_backend_ref.clone(); async move { - let search_backend = search::backend(); background_task::index_search( pool_ref, redis_pool_ref, @@ -261,8 +263,6 @@ pub fn app_setup( }); } - let search_backend = web::Data::new(search::backend()); - LabrinthConfig { pool, ro_pool, diff --git a/apps/labrinth/src/main.rs b/apps/labrinth/src/main.rs index 64a87759c8..bbe97ad01e 100644 --- a/apps/labrinth/src/main.rs +++ b/apps/labrinth/src/main.rs @@ -153,6 +153,7 @@ async fn app() -> std::io::Result<()> { let mut clickhouse = clickhouse::init_client().await.unwrap(); let search_config = search::SearchConfig::new(None); + let search_backend = actix_web::web::Data::new(search::backend()); let stripe_client = stripe::Client::new(ENV.STRIPE_API_KEY.clone()); @@ -171,6 +172,7 @@ async fn app() -> std::io::Result<()> { pool, ro_pool.into_inner(), redis_pool, + search_backend, clickhouse, stripe_client, anrok_client.clone(), @@ -206,6 +208,7 @@ async fn app() -> std::io::Result<()> { ro_pool.clone(), redis_pool.clone(), search_config.clone(), + search_backend.clone(), &mut clickhouse, file_host.clone(), stripe_client, diff --git a/apps/labrinth/src/test/mod.rs b/apps/labrinth/src/test/mod.rs index 435a51c426..0166d9e2a3 100644 --- a/apps/labrinth/src/test/mod.rs +++ b/apps/labrinth/src/test/mod.rs @@ -43,12 +43,14 @@ pub async fn setup(db: &database::TemporaryDatabase) -> LabrinthConfig { EmailQueue::init(pool.clone(), redis_pool.clone()).unwrap(); let gotenberg_client = GotenbergClient::from_env(redis_pool.clone()) .expect("Failed to create Gotenberg client"); + let search_backend = actix_web::web::Data::new(crate::search::backend()); crate::app_setup( pool.clone(), ro_pool.clone(), redis_pool.clone(), search_config, + search_backend, &mut clickhouse, file_host.clone(), stripe_client, From 72eb548d41157e6c4e754842106be36dbfc17b15 Mon Sep 17 00:00:00 2001 From: aecsocket Date: Mon, 16 Feb 2026 13:35:52 +0000 Subject: [PATCH 10/30] wip --- apps/labrinth/src/routes/internal/admin.rs | 9 +- apps/labrinth/src/routes/internal/search.rs | 7 +- apps/labrinth/src/routes/mod.rs | 4 - .../src/routes/v3/project_creation.rs | 5 - .../src/search/backend/elasticsearch/mod.rs | 107 ++++++++++++++++++ apps/labrinth/src/search/backend/mod.rs | 1 + apps/labrinth/src/search/mod.rs | 6 - 7 files changed, 118 insertions(+), 21 deletions(-) create mode 100644 apps/labrinth/src/search/backend/elasticsearch/mod.rs diff --git a/apps/labrinth/src/routes/internal/admin.rs b/apps/labrinth/src/routes/internal/admin.rs index ecc230cfff..e3db1cdd93 100644 --- a/apps/labrinth/src/routes/internal/admin.rs +++ b/apps/labrinth/src/routes/internal/admin.rs @@ -7,7 +7,7 @@ use crate::models::pats::Scopes; use crate::queue::analytics::AnalyticsQueue; use crate::queue::session::AuthQueue; use crate::routes::ApiError; -use crate::search::SearchConfig; +use crate::search::SearchBackend; use crate::util::date::get_current_tenths_of_ms; use crate::util::guards::admin_key_guard; use actix_web::{HttpRequest, HttpResponse, patch, post, web}; @@ -152,10 +152,11 @@ pub async fn count_download( pub async fn force_reindex( pool: web::Data, redis: web::Data, - config: web::Data, + search_backend: web::Data>, ) -> Result { - use crate::search::indexing::index_projects; let redis = redis.get_ref(); - index_projects(pool.as_ref().clone(), redis.clone(), &config).await?; + search_backend + .index_projects(pool.as_ref().clone(), redis.clone()) + .await?; Ok(HttpResponse::NoContent().finish()) } diff --git a/apps/labrinth/src/routes/internal/search.rs b/apps/labrinth/src/routes/internal/search.rs index 43bd21ecd7..18a82ab111 100644 --- a/apps/labrinth/src/routes/internal/search.rs +++ b/apps/labrinth/src/routes/internal/search.rs @@ -1,5 +1,6 @@ use crate::routes::ApiError; use crate::search::SearchConfig; +use crate::util::error::Context; use crate::util::guards::admin_key_guard; use actix_web::{HttpResponse, delete, get, web}; use meilisearch_sdk::tasks::{Task, TasksCancelQuery}; @@ -24,7 +25,8 @@ pub async fn tasks( Ok(tasks.results) }) - .await?; + .await + .wrap_internal_err("failed to get tasks")?; #[derive(Serialize, ToSchema)] struct MeiliTask