-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.py
More file actions
90 lines (78 loc) · 2.85 KB
/
example.py
File metadata and controls
90 lines (78 loc) · 2.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import requests
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, IntegerType
# Configuration
CATALOG_NAME = "demo_catalog"
CATALOG_URI = "http://catalog:8000" # Adjust port if needed (e.g. 8001)
S3_ENDPOINT = "http://minio:9000"
S3_ACCESS_KEY = "minioadmin"
S3_SECRET_KEY = "minioadmin"
WAREHOUSE_LOCATION = f"s3://warehouse/{CATALOG_NAME}"
def configure_catalog():
"""
Configures the catalog server with S3 details and warehouse location.
This is CRITICAL for the server to know where to write data.
"""
print(f"--- Configuring Catalog '{CATALOG_NAME}' on Server ---")
config_url = f"{CATALOG_URI}/v1/{CATALOG_NAME}/config/properties"
config_payload = {
"s3.endpoint": S3_ENDPOINT,
"s3.access-key-id": S3_ACCESS_KEY,
"s3.secret-access-key": S3_SECRET_KEY,
"warehouse": WAREHOUSE_LOCATION
}
try:
response = requests.post(config_url, json=config_payload)
response.raise_for_status()
print(f"✅ Configuration successful: {response.json()}")
except Exception as e:
print(f"❌ Configuration failed: {e}")
exit(1)
def use_catalog():
"""
Demonstrates using PyIceberg to interact with the configured catalog.
"""
print(f"\n--- Using PyIceberg with '{CATALOG_NAME}' ---")
# Load the catalog
# Note: We pass S3 config here for the CLIENT (PyIceberg) to use.
catalog = load_catalog(
CATALOG_NAME,
**{
"uri": CATALOG_URI,
"prefix": CATALOG_NAME,
"s3.endpoint": S3_ENDPOINT,
"s3.access-key-id": S3_ACCESS_KEY,
"s3.secret-access-key": S3_SECRET_KEY,
"warehouse": WAREHOUSE_LOCATION,
}
)
# 1. Create Namespace
ns = "my_namespace"
try:
catalog.create_namespace(ns)
print(f"✅ Created namespace: {ns}")
except Exception as e:
print(f"ℹ️ Namespace '{ns}' might already exist: {e}")
# 2. Create Table
table_name = f"{ns}.my_table"
schema = Schema(
NestedField(1, "id", IntegerType(), required=True),
NestedField(2, "data", StringType(), required=False)
)
try:
table = catalog.create_table(table_name, schema=schema)
print(f"✅ Created table: {table_name}")
print(f" Location: {table.location()}")
except Exception as e:
print(f"ℹ️ Table '{table_name}' might already exist: {e}")
table = catalog.load_table(table_name)
print(f"✅ Loaded existing table: {table_name}")
# 3. List Tables
tables = catalog.list_tables(ns)
print(f"✅ Tables in '{ns}': {tables}")
if __name__ == "__main__":
# Step 1: Configure the server (One-time setup per catalog)
configure_catalog()
# Step 2: Use the catalog
use_catalog()