diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 5d3ff33b3..7cf867c04 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -89,6 +89,7 @@ set(ICEBERG_SOURCES update/fast_append.cc update/pending_update.cc update/set_snapshot.cc + update/snapshot_manager.cc update/snapshot_update.cc update/update_location.cc update/update_partition_spec.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index fd82a889b..7dd3409ad 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -107,6 +107,7 @@ iceberg_sources = files( 'update/fast_append.cc', 'update/pending_update.cc', 'update/set_snapshot.cc', + 'update/snapshot_manager.cc', 'update/snapshot_update.cc', 'update/update_location.cc', 'update/update_partition_spec.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index b6c26ea00..cf081143c 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -32,6 +32,7 @@ #include "iceberg/table_scan.h" #include "iceberg/transaction.h" #include "iceberg/update/expire_snapshots.h" +#include "iceberg/update/snapshot_manager.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_partition_statistics.h" #include "iceberg/update/update_properties.h" @@ -222,6 +223,13 @@ Result> Table::NewUpdatePartitionStat return transaction->NewUpdatePartitionStatistics(); } +Result> Table::NewSnapshotManager() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return SnapshotManager::Make(std::move(transaction)); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 1f3135dd7..423911c21 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -168,6 +168,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new FastAppend to append data files and commit the changes. virtual Result> NewFastAppend(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. + virtual Result> NewSnapshotManager(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index ae61d819f..d54d3caa4 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -178,7 +178,7 @@ if(ICEBERG_BUILD_BUNDLE) SOURCES expire_snapshots_test.cc fast_append_test.cc - set_snapshot_test.cc + snapshot_manager_test.cc transaction_test.cc update_location_test.cc update_partition_spec_test.cc diff --git a/src/iceberg/test/set_snapshot_test.cc b/src/iceberg/test/set_snapshot_test.cc deleted file mode 100644 index 6bbd59b8e..000000000 --- a/src/iceberg/test/set_snapshot_test.cc +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/update/set_snapshot.h" - -#include -#include - -#include "iceberg/result.h" -#include "iceberg/snapshot.h" -#include "iceberg/test/matchers.h" -#include "iceberg/test/update_test_base.h" -#include "iceberg/transaction.h" - -namespace iceberg { - -class SetSnapshotTest : public UpdateTestBase { - protected: - // Snapshot IDs from TableMetadataV2Valid.json - static constexpr int64_t kOldestSnapshotId = 3051729675574597004; - static constexpr int64_t kCurrentSnapshotId = 3055729675574597004; - - // Timestamps from TableMetadataV2Valid.json - static constexpr int64_t kOldestSnapshotTimestamp = 1515100955770; - static constexpr int64_t kCurrentSnapshotTimestamp = 1555100955770; -}; - -TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) { - ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot()); - EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot); - - set_snapshot->SetCurrentSnapshot(kOldestSnapshotId); - - ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); - EXPECT_EQ(snapshot_id, kOldestSnapshotId); - - // Commit and verify the change was persisted - EXPECT_THAT(set_snapshot->Commit(), IsOk()); - EXPECT_THAT(transaction->Commit(), IsOk()); - ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); - EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); -} - -TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) { - ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot()); - set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId); - - ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); - EXPECT_EQ(snapshot_id, kCurrentSnapshotId); -} - -TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) { - ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot()); - // Try to set to a non-existent snapshot - int64_t invalid_snapshot_id = 9999999999999999; - set_snapshot->SetCurrentSnapshot(invalid_snapshot_id); - - auto result = set_snapshot->Apply(); - EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); - EXPECT_THAT(result, HasErrorMessage("is not found")); -} - -TEST_F(SetSnapshotTest, RollbackToValid) { - ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot()); - // Rollback to the oldest snapshot (which is an ancestor) - set_snapshot->RollbackTo(kOldestSnapshotId); - - ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); - EXPECT_EQ(snapshot_id, kOldestSnapshotId); -} - -TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) { - ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot()); - // Try to rollback to a non-existent snapshot - int64_t invalid_snapshot_id = 9999999999999999; - set_snapshot->RollbackTo(invalid_snapshot_id); - - auto result = set_snapshot->Apply(); - EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); - EXPECT_THAT(result, HasErrorMessage("unknown snapshot id")); -} - -TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) { - ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot()); - // Rollback to a time between the two snapshots - // This should select the oldest snapshot - int64_t time_between = (kOldestSnapshotTimestamp + kCurrentSnapshotTimestamp) / 2; - set_snapshot->RollbackToTime(time_between); - - ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); - EXPECT_EQ(snapshot_id, kOldestSnapshotId); -} - -TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) { - ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot()); - // Try to rollback to a time before any snapshot - int64_t time_before_all = kOldestSnapshotTimestamp - 1000000; - set_snapshot->RollbackToTime(time_before_all); - - // Should fail - no snapshot older than the specified time - auto result = set_snapshot->Apply(); - EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); - EXPECT_THAT(result, HasErrorMessage("no valid snapshot older than")); -} - -TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) { - ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot()); - // Rollback to a timestamp just after the oldest snapshot - int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1; - set_snapshot->RollbackToTime(time_just_after_oldest); - - // Apply and verify - should return the oldest snapshot - ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); - EXPECT_EQ(snapshot_id, kOldestSnapshotId); -} - -TEST_F(SetSnapshotTest, ApplyWithoutChanges) { - ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot()); - ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); - EXPECT_EQ(snapshot_id, kCurrentSnapshotId); - - EXPECT_THAT(set_snapshot->Commit(), IsOk()); - EXPECT_THAT(transaction->Commit(), IsOk()); - ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); - EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId); -} - -} // namespace iceberg diff --git a/src/iceberg/test/snapshot_manager_test.cc b/src/iceberg/test/snapshot_manager_test.cc new file mode 100644 index 000000000..bb6b0c8ef --- /dev/null +++ b/src/iceberg/test/snapshot_manager_test.cc @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/snapshot_manager.h" + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/transaction.h" +#include "iceberg/update/fast_append.h" + +namespace iceberg { + +class SnapshotManagerTest : public UpdateTestBase { + protected: + void SetUp() override { + UpdateTestBase::SetUp(); + ExtractSnapshotIdsFromTable(); + } + + void ExtractSnapshotIdsFromTable() { + ICEBERG_UNWRAP_OR_FAIL(auto current, table_->current_snapshot()); + current_snapshot_id_ = current->snapshot_id; + ASSERT_FALSE(table_->snapshots().empty()); + oldest_snapshot_id_ = table_->snapshots().front()->snapshot_id; + } + + // Snapshot IDs from the main table (TableMetadataV2Valid.json) + int64_t current_snapshot_id_{}; + int64_t oldest_snapshot_id_{}; +}; + +class SnapshotManagerMinimalTableTest : public MinimalUpdateTestBase {}; + +TEST_F(SnapshotManagerTest, CreateBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); +} + +TEST_F(SnapshotManagerTest, CreateBranchWithoutSnapshotId) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1"); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); +} + +TEST_F(SnapshotManagerMinimalTableTest, CreateBranchOnEmptyTable) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, minimal_table_->NewSnapshotManager()); + manager->CreateBranch("branch1"); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + EXPECT_FALSE( + reloaded->metadata()->refs.contains(std::string(SnapshotRef::kMainBranch))); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); +} + +TEST_F(SnapshotManagerMinimalTableTest, + CreateBranchOnEmptyTableFailsWhenRefAlreadyExists) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, minimal_table_->NewSnapshotManager()); + manager->CreateBranch("branch1"); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto table_with_branch, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, table_with_branch->NewSnapshotManager()); + manager2->CreateBranch("branch1"); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref branch1 already exists")); +} + +TEST_F(SnapshotManagerTest, CreateBranchFailsWhenRefAlreadyExists) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + // Try to create a branch with an existing name + ICEBERG_UNWRAP_OR_FAIL(auto manager2, table_->NewSnapshotManager()); + manager2->CreateBranch("branch1", current_snapshot_id_); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(result, HasErrorMessage("branch 'branch1' was created concurrently")); +} + +TEST_F(SnapshotManagerTest, CreateTag) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateTag("tag1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kTag); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); +} + +TEST_F(SnapshotManagerTest, CreateTagFailsWhenRefAlreadyExists) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateTag("tag1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + // Try to create a tag with an existing name + ICEBERG_UNWRAP_OR_FAIL(auto manager2, table_->NewSnapshotManager()); + manager2->CreateTag("tag1", current_snapshot_id_); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); + EXPECT_THAT(result, HasErrorMessage("tag 'tag1' was created concurrently")); +} + +TEST_F(SnapshotManagerTest, RemoveBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->RemoveBranch("branch1"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + EXPECT_FALSE(reloaded->metadata()->refs.contains("branch1")); + } +} + +TEST_F(SnapshotManagerTest, RemovingNonExistingBranchFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->RemoveBranch("non-existing"); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Branch does not exist: non-existing")); +} + +TEST_F(SnapshotManagerTest, RemovingMainBranchFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->RemoveBranch(std::string(SnapshotRef::kMainBranch)); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot remove main branch")); +} + +TEST_F(SnapshotManagerTest, RemoveTag) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateTag("tag1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->RemoveTag("tag1"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + EXPECT_FALSE(reloaded->metadata()->refs.contains("tag1")); + } +} + +TEST_F(SnapshotManagerTest, RemovingNonExistingTagFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->RemoveTag("non-existing"); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Tag does not exist: non-existing")); +} + +TEST_F(SnapshotManagerTest, ReplaceBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", oldest_snapshot_id_); + manager->CreateBranch("branch2", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->ReplaceBranch("branch1", "branch2"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_NE(ref, nullptr); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); + } +} + +TEST_F(SnapshotManagerTest, ReplaceBranchNonExistingToBranchFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->ReplaceBranch("branch1", "non-existing"); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref does not exist: non-existing")); +} + +TEST_F(SnapshotManagerTest, ReplaceBranchNonExistingFromBranchCreatesTheBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->ReplaceBranch("new-branch", "branch1"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("new-branch"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); + } +} + +TEST_F(SnapshotManagerTest, FastForwardBranchNonExistingFromBranchCreatesTheBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->FastForwardBranch("new-branch", "branch1"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("new-branch"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); + } +} + +TEST_F(SnapshotManagerTest, FastForwardBranchNonExistingToFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->FastForwardBranch("branch1", "non-existing"); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref does not exist: non-existing")); +} + +TEST_F(SnapshotManagerTest, ReplaceTag) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateTag("tag1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->ReplaceTag("tag1", current_snapshot_id_); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_NE(ref, nullptr); + EXPECT_EQ(ref->snapshot_id, current_snapshot_id_); + } +} + +TEST_F(SnapshotManagerTest, UpdatingBranchRetention) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->SetMinSnapshotsToKeep("branch1", 10); + manager2->SetMaxSnapshotAgeMs("branch1", 20000); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->type(), SnapshotRefType::kBranch); + const auto& branch = std::get(ref->retention); + EXPECT_EQ(branch.max_snapshot_age_ms, 20000); + EXPECT_EQ(branch.min_snapshots_to_keep, 10); + } +} + +TEST_F(SnapshotManagerTest, SettingBranchRetentionOnTagFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateTag("tag1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->SetMinSnapshotsToKeep("tag1", 10); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref 'tag1' is a tag not a branch")); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->SetMaxSnapshotAgeMs("tag1", 10); + auto result = manager2->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Ref 'tag1' is a tag not a branch")); + } +} + +TEST_F(SnapshotManagerTest, UpdatingBranchMaxRefAge) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->SetMaxRefAgeMs("branch1", 10000); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->max_ref_age_ms(), 10000); + } +} + +TEST_F(SnapshotManagerTest, UpdatingTagMaxRefAge) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateTag("tag1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->SetMaxRefAgeMs("tag1", 10000); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(it, reloaded->metadata()->refs.end()); + auto ref = it->second; + EXPECT_EQ(ref->max_ref_age_ms(), 10000); + } +} + +TEST_F(SnapshotManagerTest, RenameBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->RenameBranch("branch1", "branch2"); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + auto it1 = reloaded->metadata()->refs.find("branch1"); + EXPECT_EQ(it1, reloaded->metadata()->refs.end()); + + auto it2 = reloaded->metadata()->refs.find("branch2"); + EXPECT_NE(it2, reloaded->metadata()->refs.end()); + auto ref2 = it2->second; + EXPECT_EQ(ref2->snapshot_id, current_snapshot_id_); + } +} + +TEST_F(SnapshotManagerTest, FailRenamingMainBranch) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->RenameBranch(std::string(SnapshotRef::kMainBranch), "some-branch"); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot rename main branch")); +} + +TEST_F(SnapshotManagerTest, RenamingNonExistingBranchFails) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->RenameBranch("some-missing-branch", "some-branch"); + auto result = manager->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Branch does not exist: some-missing-branch")); +} + +TEST_F(SnapshotManagerTest, RollbackTo) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->RollbackTo(oldest_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_); +} + +TEST_F(SnapshotManagerTest, SetCurrentSnapshot) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->SetCurrentSnapshot(oldest_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_); +} + +TEST_F(SnapshotManagerTest, CreateReferencesAndRollback) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + manager->CreateBranch("branch1", current_snapshot_id_); + manager->CreateTag("tag1", current_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto manager2, reloaded->NewSnapshotManager()); + manager2->RollbackTo(oldest_snapshot_id_); + EXPECT_THAT(manager2->Commit(), IsOk()); + } + + { + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_); + + auto branch_it = reloaded->metadata()->refs.find("branch1"); + EXPECT_NE(branch_it, reloaded->metadata()->refs.end()); + EXPECT_EQ(branch_it->second->snapshot_id, current_snapshot_id_); + + auto tag_it = reloaded->metadata()->refs.find("tag1"); + EXPECT_NE(tag_it, reloaded->metadata()->refs.end()); + EXPECT_EQ(tag_it->second->snapshot_id, current_snapshot_id_); + } +} + +TEST_F(SnapshotManagerTest, SnapshotManagerThroughTransaction) { + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make(txn)); + + manager->RollbackTo(oldest_snapshot_id_); + EXPECT_THAT(txn->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_); +} + +TEST_F(SnapshotManagerTest, SnapshotManagerFromTableAllowsMultipleSnapshotOperations) { + ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager()); + + manager->SetCurrentSnapshot(oldest_snapshot_id_); + manager->SetCurrentSnapshot(current_snapshot_id_); + manager->RollbackTo(oldest_snapshot_id_); + EXPECT_THAT(manager->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_); +} + +TEST_F(SnapshotManagerTest, + SnapshotManagerFromTransactionAllowsMultipleSnapshotOperations) { + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make(txn)); + + manager->SetCurrentSnapshot(oldest_snapshot_id_); + manager->SetCurrentSnapshot(current_snapshot_id_); + manager->RollbackTo(oldest_snapshot_id_); + EXPECT_THAT(txn->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_); +} + +} // namespace iceberg diff --git a/src/iceberg/test/update_test_base.h b/src/iceberg/test/update_test_base.h index c14cb76b9..d3d245d1f 100644 --- a/src/iceberg/test/update_test_base.h +++ b/src/iceberg/test/update_test_base.h @@ -56,6 +56,7 @@ class UpdateTestBase : public ::testing::Test { static_cast(*file_io_).fs()); ASSERT_TRUE(arrow_fs != nullptr); ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok()); + ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok()); } /// \brief Register a table from a metadata resource file. @@ -85,4 +86,52 @@ class UpdateTestBase : public ::testing::Test { std::shared_ptr
table_; }; +// Base test fixture for table update operations on minimal table metadata. +class MinimalUpdateTestBase : public ::testing::Test { + protected: + void SetUp() override { + InitializeFileIO(); + RegisterMinimalTableFromResource("TableMetadataV2ValidMinimal.json"); + } + + /// \brief Initialize file IO and create necessary directories. + void InitializeFileIO() { + file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + catalog_ = + InMemoryCatalog::Make("test_catalog", file_io_, "/warehouse/", /*properties=*/{}); + + // Arrow MockFS cannot automatically create directories. + auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>( + static_cast(*file_io_).fs()); + ASSERT_TRUE(arrow_fs != nullptr); + ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok()); + } + + /// \brief Register a minimal table from a metadata resource file. + /// + /// \param resource_name The name of the metadata resource file + void RegisterMinimalTableFromResource(const std::string& resource_name) { + // Drop existing table if it exists + std::ignore = catalog_->DropTable(table_ident_, /*purge=*/false); + + // Write table metadata to the table location. + auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json", + table_location_, Uuid::GenerateV7().ToString()); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, ReadTableMetadataFromResource(resource_name)); + metadata->location = table_location_; + ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata), + IsOk()); + + // Register the table in the catalog. + ICEBERG_UNWRAP_OR_FAIL(minimal_table_, + catalog_->RegisterTable(table_ident_, metadata_location)); + } + + const TableIdentifier table_ident_{.name = "minimal_table"}; + const std::string table_location_{"/warehouse/minimal_table"}; + std::shared_ptr file_io_; + std::shared_ptr catalog_; + std::shared_ptr
minimal_table_; +}; + } // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index b24aa0da3..4342d5238 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -36,6 +36,7 @@ #include "iceberg/update/fast_append.h" #include "iceberg/update/pending_update.h" #include "iceberg/update/set_snapshot.h" +#include "iceberg/update/snapshot_manager.h" #include "iceberg/update/snapshot_update.h" #include "iceberg/update/update_location.h" #include "iceberg/update/update_partition_spec.h" @@ -62,9 +63,7 @@ Transaction::~Transaction() = default; Result> Transaction::Make(std::shared_ptr
table, Kind kind, bool auto_commit) { - if (!table || !table->catalog()) [[unlikely]] { - return InvalidArgument("Table and catalog cannot be null"); - } + ICEBERG_PRECHECK(table && table->catalog(), "Table and catalog cannot be null"); std::unique_ptr metadata_builder; if (kind == Kind::kCreate) { @@ -93,9 +92,11 @@ std::string Transaction::MetadataFileLocation(std::string_view filename) const { } Status Transaction::AddUpdate(const std::shared_ptr& update) { - if (!last_update_committed_) { - return InvalidArgument("Cannot add update when previous update is not committed"); - } + ICEBERG_PRECHECK(update->kind() != PendingUpdate::Kind::kSnapshotManager, + "SnapshotManager should not be added to the transaction"); + ICEBERG_CHECK(last_update_committed_, + "Cannot add update when previous update is not committed"); + pending_updates_.emplace_back(std::weak_ptr(update)); last_update_committed_ = false; return {}; @@ -301,13 +302,9 @@ Status Transaction::ApplyUpdatePartitionStatistics(UpdatePartitionStatistics& up } Result> Transaction::Commit() { - if (committed_) { - return Invalid("Transaction already committed"); - } - if (!last_update_committed_) { - return InvalidArgument( - "Cannot commit transaction when previous update is not committed"); - } + ICEBERG_CHECK(!committed_, "Transaction already committed"); + ICEBERG_CHECK(last_update_committed_, + "Cannot commit transaction when previous update is not committed"); const auto& updates = metadata_builder_->changes(); if (updates.empty()) { @@ -428,4 +425,9 @@ Transaction::NewUpdateSnapshotReference() { return update_ref; } +Result> Transaction::NewSnapshotManager() { + // SnapshotManager has its own commit logic, so it is not added to the pending updates. + return SnapshotManager::Make(shared_from_this()); +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index e975be7ff..c083a3bf4 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -55,6 +55,12 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> Commit(); + /// \brief Returns true if this transaction has been committed. + bool is_committed() const { return committed_; } + /// \brief Create a new UpdatePartitionSpec to update the partition spec of this table /// and commit the changes. Result> NewUpdatePartitionSpec(); @@ -94,16 +103,11 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewUpdateLocation(); - /// \brief Create a new SetSnapshot to set the current snapshot or rollback to a - /// previous snapshot and commit the changes. - Result> NewSetSnapshot(); - /// \brief Create a new FastAppend to append data files and commit the changes. Result> NewFastAppend(); - /// \brief Create a new UpdateSnapshotReference to update snapshot references (branches - /// and tags) and commit the changes. - Result> NewUpdateSnapshotReference(); + /// \brief Create a new SnapshotManager to manage snapshots. + Result> NewSnapshotManager(); private: Transaction(std::shared_ptr
table, Kind kind, bool auto_commit, @@ -111,6 +115,14 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this& update); + /// \brief Create a new SetSnapshot to set the current snapshot or rollback to a + /// previous snapshot and commit the changes. + Result> NewSetSnapshot(); + + /// \brief Create a new UpdateSnapshotReference to update snapshot references (branches + /// and tags) and commit the changes. + Result> NewUpdateSnapshotReference(); + /// \brief Apply the pending changes to current table. Status Apply(PendingUpdate& updates); @@ -129,6 +141,7 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this table_; @@ -136,7 +149,7 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/update/fast_append.h" +#include "iceberg/update/set_snapshot.h" +#include "iceberg/update/update_snapshot_reference.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace { + +class AutoCommitGuard { + public: + AutoCommitGuard(std::shared_ptr transaction, bool auto_commit) + : transaction_(std::move(transaction)), auto_commit_(auto_commit) {} + + ~AutoCommitGuard() { + if (auto_commit_) { + transaction_->EnableAutoCommit(); + } else { + transaction_->DisableAutoCommit(); + } + } + + private: + std::shared_ptr transaction_; + bool auto_commit_; +}; + +} // namespace + +Result> SnapshotManager::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, "Invalid input transaction: null"); + return std::shared_ptr(new SnapshotManager(std::move(transaction))); +} + +SnapshotManager::SnapshotManager(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)), + original_auto_commit_(transaction_->auto_commit_) { + transaction_->DisableAutoCommit(); +} + +SnapshotManager::~SnapshotManager() = default; + +SnapshotManager& SnapshotManager::Cherrypick(int64_t snapshot_id) { + ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); + // TODO(anyone): Implement cherrypick operation + ICEBERG_BUILDER_CHECK(false, "Cherrypick operation not yet implemented"); + return *this; +} + +SnapshotManager& SnapshotManager::SetCurrentSnapshot(int64_t snapshot_id) { + ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); + set_snapshot->SetCurrentSnapshot(snapshot_id); + ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); + return *this; +} + +SnapshotManager& SnapshotManager::RollbackToTime(int64_t timestamp_ms) { + ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); + set_snapshot->RollbackToTime(timestamp_ms); + ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); + return *this; +} + +SnapshotManager& SnapshotManager::RollbackTo(int64_t snapshot_id) { + ICEBERG_BUILDER_RETURN_IF_ERROR(CommitIfRefUpdatesExist()); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto set_snapshot, transaction_->NewSetSnapshot()); + set_snapshot->RollbackTo(snapshot_id); + ICEBERG_BUILDER_RETURN_IF_ERROR(set_snapshot->Commit()); + return *this; +} + +SnapshotManager& SnapshotManager::CreateBranch(const std::string& name) { + if (base().current_snapshot_id != kInvalidSnapshotId) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot, base().Snapshot()); + ICEBERG_DCHECK(current_snapshot != nullptr, "Current snapshot should not be null"); + return CreateBranch(name, current_snapshot->snapshot_id); + } + const auto& current_refs = base().refs; + ICEBERG_BUILDER_CHECK(!base().refs.contains(name), "Ref {} already exists", name); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto fast_append, transaction_->NewFastAppend()); + ICEBERG_BUILDER_RETURN_IF_ERROR(fast_append->SetTargetBranch(name).Commit()); + return *this; +} + +SnapshotManager& SnapshotManager::CreateBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->CreateBranch(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::CreateTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->CreateTag(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::RemoveBranch(const std::string& name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->RemoveBranch(name); + return *this; +} + +SnapshotManager& SnapshotManager::RemoveTag(const std::string& name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->RemoveTag(name); + return *this; +} + +SnapshotManager& SnapshotManager::ReplaceTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->ReplaceTag(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->ReplaceBranch(name, snapshot_id); + return *this; +} + +SnapshotManager& SnapshotManager::ReplaceBranch(const std::string& from, + const std::string& to) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->ReplaceBranch(from, to); + return *this; +} + +SnapshotManager& SnapshotManager::FastForwardBranch(const std::string& from, + const std::string& to) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->FastForward(from, to); + return *this; +} + +SnapshotManager& SnapshotManager::RenameBranch(const std::string& name, + const std::string& new_name) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->RenameBranch(name, new_name); + return *this; +} + +SnapshotManager& SnapshotManager::SetMinSnapshotsToKeep(const std::string& branch_name, + int32_t min_snapshots_to_keep) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->SetMinSnapshotsToKeep(branch_name, min_snapshots_to_keep); + return *this; +} + +SnapshotManager& SnapshotManager::SetMaxSnapshotAgeMs(const std::string& branch_name, + int64_t max_snapshot_age_ms) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->SetMaxSnapshotAgeMs(branch_name, max_snapshot_age_ms); + return *this; +} + +SnapshotManager& SnapshotManager::SetMaxRefAgeMs(const std::string& name, + int64_t max_ref_age_ms) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto update_ref, UpdateSnapshotReferencesOperation()); + update_ref->SetMaxRefAgeMs(name, max_ref_age_ms); + return *this; +} + +Status SnapshotManager::Commit() { + AutoCommitGuard auto_commit_guard(transaction_, original_auto_commit_); + transaction_->EnableAutoCommit(); + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist()); + if (!transaction_->is_committed()) { + ICEBERG_RETURN_UNEXPECTED(transaction_->Commit()); + } + return {}; +} + +Result> +SnapshotManager::UpdateSnapshotReferencesOperation() { + if (update_snap_refs_ == nullptr) { + ICEBERG_ASSIGN_OR_RAISE(update_snap_refs_, + transaction_->NewUpdateSnapshotReference()); + } + return update_snap_refs_; +} + +Status SnapshotManager::CommitIfRefUpdatesExist() { + if (update_snap_refs_ != nullptr) { + ICEBERG_RETURN_UNEXPECTED(update_snap_refs_->Commit()); + update_snap_refs_ = nullptr; + } + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/snapshot_manager.h b/src/iceberg/update/snapshot_manager.h new file mode 100644 index 000000000..405f2ca00 --- /dev/null +++ b/src/iceberg/update/snapshot_manager.h @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +namespace iceberg { + +/// \brief API for managing snapshots and snapshot references. +/// +/// Allows rolling table data back to a state at an older snapshot, cherry-picking +/// snapshots, and managing branches and tags. +class ICEBERG_EXPORT SnapshotManager : public PendingUpdate { + public: + /// \brief Create a SnapshotManager from an existing transaction. + /// + /// \param transaction The transaction to use + /// \return A new SnapshotManager instance + static Result> Make( + std::shared_ptr transaction); + + ~SnapshotManager() override; + + Kind kind() const final { return Kind::kSnapshotManager; } + + /// \brief Apply supported changes in given snapshot and create a new snapshot which + /// will be set as the current snapshot on commit. + /// + /// \param snapshot_id A Snapshot ID whose changes to apply + /// \return Reference to this for method chaining + SnapshotManager& Cherrypick(int64_t snapshot_id); + + /// \brief Roll this table's data back to a specific Snapshot ID. + /// + /// \param snapshot_id Snapshot ID to roll back table data to + /// \return Reference to this for method chaining + SnapshotManager& SetCurrentSnapshot(int64_t snapshot_id); + + /// \brief Roll this table's data back to the last snapshot before the given timestamp. + /// + /// \param timestamp_ms A timestamp in milliseconds + /// \return Reference to this for method chaining + SnapshotManager& RollbackToTime(int64_t timestamp_ms); + + /// \brief Rollback table's state to a specific Snapshot ID. + /// + /// \param snapshot_id Snapshot ID to roll back table to. Must be an ancestor + /// of the current snapshot + /// \return Reference to this for method chaining + SnapshotManager& RollbackTo(int64_t snapshot_id); + + /// \brief Create a new branch. The branch will point to current snapshot if the current + /// snapshot is not NULL. Otherwise, the branch will point to a newly created empty + /// snapshot. + /// + /// \param name Branch name + /// \return Reference to this for method chaining + SnapshotManager& CreateBranch(const std::string& name); + + /// \brief Create a new branch pointing to the given Snapshot ID. + /// + /// \param name Branch name + /// \param snapshot_id Snapshot ID which will be the head of the branch + /// \return Reference to this for method chaining + SnapshotManager& CreateBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Create a new tag pointing to the given Snapshot ID. + /// + /// \param name Tag name + /// \param snapshot_id Snapshot ID for the head of the new tag + /// \return Reference to this for method chaining + SnapshotManager& CreateTag(const std::string& name, int64_t snapshot_id); + + /// \brief Remove a branch by name. + /// + /// \param name Branch name + /// \return Reference to this for method chaining + SnapshotManager& RemoveBranch(const std::string& name); + + /// \brief Remove the tag with the given name. + /// + /// \param name Tag name + /// \return Reference to this for method chaining + SnapshotManager& RemoveTag(const std::string& name); + + /// \brief Replaces the tag with the given name to point to the specified snapshot. + /// + /// \param name Tag to replace + /// \param snapshot_id New Snapshot ID for the given tag + /// \return Reference to this for method chaining + SnapshotManager& ReplaceTag(const std::string& name, int64_t snapshot_id); + + /// \brief Replaces the branch with the given name to point to the specified snapshot. + /// + /// \param name Branch to replace + /// \param snapshot_id New Snapshot ID for the given branch + /// \return Reference to this for method chaining + SnapshotManager& ReplaceBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Replaces the `from` branch to point to the `to` snapshot. The `to` will + /// remain unchanged, and `from` branch will retain its retention properties. If the + /// `from` branch does not exist, it will be created with default retention properties. + /// + /// \param from Branch to replace + /// \param to The branch from should be replaced with + /// \return Reference to this for method chaining + SnapshotManager& ReplaceBranch(const std::string& from, const std::string& to); + + /// \brief Performs a fast-forward of `from` up to the `to` snapshot if `from` is an + /// ancestor of `to`. The `to` will remain unchanged, and `from` will retain its + /// retention properties. If the `from` branch does not exist, it will be created with + /// default retention properties. + /// + /// \param from Branch to fast-forward + /// \param to Ref for the from branch to be fast forwarded to + /// \return Reference to this for method chaining + SnapshotManager& FastForwardBranch(const std::string& from, const std::string& to); + + /// \brief Rename a branch. + /// + /// \param name Name of branch to rename + /// \param new_name The desired new name of the branch + /// \return Reference to this for method chaining + SnapshotManager& RenameBranch(const std::string& name, const std::string& new_name); + + /// \brief Updates the minimum number of snapshots to keep for a branch. + /// + /// \param branch_name Branch name + /// \param min_snapshots_to_keep Minimum number of snapshots to retain on the branch + /// \return Reference to this for method chaining + SnapshotManager& SetMinSnapshotsToKeep(const std::string& branch_name, + int32_t min_snapshots_to_keep); + + /// \brief Updates the max snapshot age for a branch. + /// + /// \param branch_name Branch name + /// \param max_snapshot_age_ms Maximum snapshot age in milliseconds to retain on branch + /// \return Reference to this for method chaining + SnapshotManager& SetMaxSnapshotAgeMs(const std::string& branch_name, + int64_t max_snapshot_age_ms); + + /// \brief Updates the retention policy for a reference. + /// + /// \param name Reference name + /// \param max_ref_age_ms Retention age in milliseconds of the tag reference itself + /// \return Reference to this for method chaining + SnapshotManager& SetMaxRefAgeMs(const std::string& name, int64_t max_ref_age_ms); + + /// \brief Commit all pending changes. + /// + /// \return Status indicating success or failure + Status Commit() override; + + private: + /// \brief Constructor for creating a SnapshotManager with a transaction. + /// + /// \param transaction The transaction to use + explicit SnapshotManager(std::shared_ptr transaction); + + /// \brief Get or create the UpdateSnapshotReference operation. + Result> UpdateSnapshotReferencesOperation(); + + /// \brief Commit any pending reference updates if they exist. + Status CommitIfRefUpdatesExist(); + + bool original_auto_commit_; + std::shared_ptr update_snap_refs_; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index 38c5129f4..682d5f889 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -331,20 +331,6 @@ Status SnapshotUpdate::Finalize(std::optional commit_error) { return {}; } -Status SnapshotUpdate::SetTargetBranch(const std::string& branch) { - ICEBERG_PRECHECK(!branch.empty(), "Branch name cannot be empty"); - - if (auto ref_it = base().refs.find(branch); ref_it != base().refs.end()) { - ICEBERG_PRECHECK( - ref_it->second->type() == SnapshotRefType::kBranch, - "{} is a tag, not a branch. Tags cannot be targets for producing snapshots", - branch); - } - - target_branch_ = branch; - return {}; -} - Result> SnapshotUpdate::ComputeSummary( const TableMetadata& previous) { std::unordered_map summary = Summary(); diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h index 12c3b19dc..fdbb2660d 100644 --- a/src/iceberg/update/snapshot_update.h +++ b/src/iceberg/update/snapshot_update.h @@ -76,6 +76,28 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { return self; } + /// \brief Perform operations on a particular branch + /// + /// \param branch Which is name of SnapshotRef of type branch + /// \return Reference to this for method chaining + auto& SetTargetBranch(this auto& self, const std::string& branch) { + if (branch.empty()) [[unlikely]] { + return self.AddError(ErrorKind::kInvalidArgument, "Branch name cannot be empty"); + } + + if (auto ref_it = self.base().refs.find(branch); ref_it != self.base().refs.end()) { + if (ref_it->second->type() != SnapshotRefType::kBranch) { + return self.AddError(ErrorKind::kInvalidArgument, + "{} is a tag, not a branch. Tags cannot be targets for " + "producing snapshots", + branch); + } + } + + self.target_branch_ = branch; + return self; + } + /// \brief Set a summary property. /// /// \param property The property name @@ -121,7 +143,6 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate { std::span> files, const std::shared_ptr& spec); - Status SetTargetBranch(const std::string& branch); const std::string& target_branch() const { return target_branch_; } bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; } const std::string& commit_uuid() const { return commit_uuid_; }