Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 87 additions & 13 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -61,6 +62,9 @@
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout;
import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep;
import static org.apache.paimon.jdbc.JdbcUtils.deleteProperties;
Expand Down Expand Up @@ -112,6 +116,10 @@ public JdbcClientPool getConnections() {
return connections;
}

public String getCatalogKey() {
return catalogKey;
}

/** Initialize catalog tables. */
private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedException {
String uri = options.get(CatalogOptions.URI.key());
Expand Down Expand Up @@ -301,19 +309,11 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
runWithLock(identifier, () -> schemaManager.createTable(schema));
// Update schema metadata
Path path = getTableLocation(identifier);
int insertRecord =
connections.run(
conn -> {
try (PreparedStatement sql =
conn.prepareStatement(
JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) {
sql.setString(1, catalogKey);
sql.setString(2, identifier.getDatabaseName());
sql.setString(3, identifier.getTableName());
return sql.executeUpdate();
}
});
if (insertRecord == 1) {
if (JdbcUtils.insertTable(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName())) {
LOG.debug("Successfully committed to new table: {}", identifier);
} else {
try {
Expand Down Expand Up @@ -415,6 +415,80 @@ public <T> T runWithLock(Identifier identifier, Callable<T> callable) throws Exc
return Lock.fromCatalog(lock, identifier).runWithLock(callable);
}

@Override
public void repairCatalog() {
List<String> databases;
try {
databases = listDatabasesInFileSystem(new Path(warehouse));
} catch (IOException e) {
throw new RuntimeException("Failed to list databases in file system", e);
}
for (String database : databases) {
repairDatabase(database);
}
}

@Override
public void repairDatabase(String databaseName) {
checkNotSystemDatabase(databaseName);

// First check if database exists in file system
Path databasePath = newDatabasePath(databaseName);
List<String> tables;
try {
if (!fileIO.exists(databasePath)) {
throw new RuntimeException("Database directory does not exist: " + databasePath);
}
tables = listTablesInFileSystem(databasePath);
} catch (IOException e) {
throw new RuntimeException(e);
}

if (!JdbcUtils.databaseExists(connections, catalogKey, databaseName)) {
createDatabaseImpl(databaseName, Collections.emptyMap());
}

// Repair tables
for (String table : tables) {
try {
repairTable(Identifier.create(databaseName, table));
} catch (TableNotExistException ignore) {
// Table might not exist due to concurrent operations
}
}
}

@Override
public void repairTable(Identifier identifier) throws TableNotExistException {
checkNotBranch(identifier, "repairTable");
checkNotSystemTable(identifier, "repairTable");

// First check if table exists in file system
Path tableLocation = getTableLocation(identifier);
TableSchema tableSchema =
tableSchemaInFileSystem(tableLocation, identifier.getBranchNameOrDefault())
.orElseThrow(() -> new TableNotExistException(identifier));

if (!JdbcUtils.databaseExists(connections, catalogKey, identifier.getDatabaseName())) {
createDatabaseImpl(identifier.getDatabaseName(), Collections.emptyMap());
}
// Table exists in file system, now check if it exists in JDBC catalog
if (!JdbcUtils.tableExists(
connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName())) {
// Table missing from JDBC catalog, repair it
if (JdbcUtils.insertTable(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName())) {
LOG.debug("Successfully repaired table: {}", identifier);
} else {
LOG.error("Failed to repair table: {}", identifier);
}
}
// If table exists in both file system and JDBC catalog, nothing to repair
}

@Override
public void close() throws Exception {
connections.close();
Expand Down
21 changes: 21 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,27 @@ public static int execute(
}
}

public static boolean insertTable(
JdbcClientPool connections, String catalogKey, String databaseName, String tableName) {
try {
int insertRecord =
connections.run(
conn -> {
try (PreparedStatement sql =
conn.prepareStatement(
JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) {
sql.setString(1, catalogKey);
sql.setString(2, databaseName);
sql.setString(3, tableName);
return sql.executeUpdate();
}
});
return insertRecord == 1;
} catch (SQLException | InterruptedException e) {
throw new RuntimeException("Failed to insert table: " + tableName, e);
}
}

public static boolean insertProperties(
JdbcClientPool connections,
String storeKey,
Expand Down
201 changes: 201 additions & 0 deletions paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

import org.junit.jupiter.api.BeforeEach;
Expand All @@ -34,6 +38,8 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;

Expand Down Expand Up @@ -129,4 +135,199 @@ public void testSerializeTable() throws Exception {
protected boolean supportsAlterDatabase() {
return true;
}

@Test
public void testRepairTableNotExist() throws Exception {
String databaseName = "repair_db";
String tableName = "nonexistent_table";

catalog.createDatabase(databaseName, false);
Identifier identifier = Identifier.create(databaseName, tableName);

// Test repair on non-existent table - should throw TableNotExistException
assertThatThrownBy(() -> catalog.repairTable(identifier))
.isInstanceOf(Catalog.TableNotExistException.class);
}

@Test
public void testRepairTableWithSystemTable() {
Identifier systemTableId = Identifier.create("sys", "system_table");

// System tables should not be repairable
assertThatThrownBy(() -> catalog.repairTable(systemTableId))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("sys");
}

@Test
public void testRepairTable() throws Exception {
String databaseName = "fs_repair_db";
String tableName = "fs_repair_table";

// Create table normally (this creates both filesystem and JDBC entries)
catalog.createDatabase(databaseName, false);
Identifier identifier = Identifier.create(databaseName, tableName);
catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false);

// Verify table exists in both places
assertThat(catalog.listTables(databaseName)).contains(tableName);
assertDoesNotThrow(() -> catalog.getTable(identifier));

// Repair on existing table should work fine (idempotent operation)
assertDoesNotThrow(() -> catalog.repairTable(identifier));

// Table should still exist and be accessible
assertThat(catalog.listTables(databaseName)).contains(tableName);
assertDoesNotThrow(() -> catalog.getTable(identifier));

// Test repair when table is missing from JDBC store
JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog;

// Remove table from JDBC store but leave filesystem intact
JdbcUtils.execute(
jdbcCatalog.getConnections(),
JdbcUtils.DROP_TABLE_SQL,
jdbcCatalog.getCatalogKey(),
databaseName,
tableName);

// Verify table is missing from JDBC catalog
assertThat(catalog.listTables(databaseName)).doesNotContain(tableName);
assertThatThrownBy(() -> catalog.getTable(identifier))
.isInstanceOf(Catalog.TableNotExistException.class);

// Repair the table - should recreate it in JDBC store
assertDoesNotThrow(() -> catalog.repairTable(identifier));

// Verify table is back in JDBC catalog after repair
assertThat(catalog.listTables(databaseName)).contains(tableName);
assertDoesNotThrow(() -> catalog.getTable(identifier));
}

@Test
public void testRepairDatabase() throws Exception {
String databaseName = "repair_database";

// Create database and some tables
catalog.createDatabase(databaseName, false);
catalog.createTable(Identifier.create(databaseName, "table1"), DEFAULT_TABLE_SCHEMA, false);
catalog.createTable(Identifier.create(databaseName, "table2"), DEFAULT_TABLE_SCHEMA, false);

// Test repair database - should not throw exception and should work correctly
assertDoesNotThrow(() -> catalog.repairDatabase(databaseName));

// Verify tables still exist after repair
List<String> tables = catalog.listTables(databaseName);
assertThat(tables).containsExactlyInAnyOrder("table1", "table2");

// Test repair when database is missing from JDBC store
JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog;

// Remove database from JDBC store (this also removes tables)
JdbcUtils.execute(
jdbcCatalog.getConnections(),
JdbcUtils.DELETE_TABLES_SQL,
jdbcCatalog.getCatalogKey(),
databaseName);
JdbcUtils.execute(
jdbcCatalog.getConnections(),
JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL,
jdbcCatalog.getCatalogKey(),
databaseName);

// Verify database is missing from JDBC catalog
assertThat(catalog.listDatabases()).doesNotContain(databaseName);
assertThatThrownBy(() -> catalog.getDatabase(databaseName))
.isInstanceOf(Catalog.DatabaseNotExistException.class);

// Repair the database - should recreate database and tables in JDBC store
assertDoesNotThrow(() -> catalog.repairDatabase(databaseName));

// Verify database and tables are back in JDBC catalog after repair
assertThat(catalog.listDatabases()).contains(databaseName);
assertThat(catalog.listTables(databaseName)).containsExactlyInAnyOrder("table1", "table2");
assertDoesNotThrow(() -> catalog.getDatabase(databaseName));
}

@Test
public void testRepairDatabaseSystemDatabase() {
// System database should not be repairable
assertThatThrownBy(() -> catalog.repairDatabase("sys"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("sys");
}

@Test
public void testRepairDatabaseNotExists() throws Exception {
String nonExistentDb = "non_existent_db";

// Repairing a non-existent database should throw RuntimeException
assertThatThrownBy(() -> catalog.repairDatabase(nonExistentDb))
.isInstanceOf(RuntimeException.class);

// Database should not exist after failed repair
assertThat(catalog.listDatabases()).doesNotContain(nonExistentDb);
}

@Test
public void testRepairCatalog() throws Exception {
// Create multiple databases with tables
String[] databases = {"repair_db1", "repair_db2", "repair_db3"};

Schema schema =
new Schema(
Lists.newArrayList(
new DataField(0, "id", DataTypes.INT()),
new DataField(1, "data", DataTypes.STRING())),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyMap(),
"");

for (String dbName : databases) {
catalog.createDatabase(dbName, false);
catalog.createTable(Identifier.create(dbName, "test_table"), schema, false);
}

// Test repair entire catalog - should not throw exception
assertDoesNotThrow(() -> catalog.repairCatalog());

// Verify all databases and tables still exist
List<String> catalogDatabases = catalog.listDatabases();
for (String dbName : databases) {
assertThat(catalogDatabases).contains(dbName);
assertThat(catalog.listTables(dbName)).contains("test_table");
}
}

@Test
public void testInsertTableUtility() throws Exception {
String databaseName = "insert_test_db";
String tableName = "insert_test_table";

catalog.createDatabase(databaseName, false);

JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog;

// Test insertTable utility method
boolean result =
JdbcUtils.insertTable(
jdbcCatalog.getConnections(),
jdbcCatalog.getCatalogKey(),
databaseName,
tableName);

assertThat(result).isTrue();

// Try inserting the same table again - should throw exception for duplicate
assertThatThrownBy(
() ->
JdbcUtils.insertTable(
jdbcCatalog.getConnections(),
jdbcCatalog.getCatalogKey(),
databaseName,
tableName))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to insert table");
}
}
Loading