Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9efcd6d
refactor: use Optional<ConsistencyLevel>
bevzzz Feb 9, 2026
df181cf
wip(batch): add batch primitives (TaskHandle, Message, BatchContext
bevzzz Feb 9, 2026
2db0147
feat(batch): start a new batch from CollectionHandle
bevzzz Feb 10, 2026
5af3cb5
feat(batch): implement state transitions and event handling
bevzzz Feb 11, 2026
4dad5e0
chore(batch): remove old comment
bevzzz Feb 11, 2026
7e510dd
feat(batch): implement graceful shutdown and abort
bevzzz Feb 13, 2026
7e22f39
wip(batch): implement shutdown policies
bevzzz Feb 16, 2026
abb2d78
feat(batch): implement reconnect and shutdown policies
bevzzz Feb 17, 2026
3bd2424
chore(batch): fix type Messeger -> Messenger
bevzzz Feb 17, 2026
825bd65
fix(batch): perform OOM reconnect sequence via BaseState, not BatchCo…
bevzzz Feb 18, 2026
d60ed54
feat(batch): reconnect to GCP every 160 seconds
bevzzz Feb 18, 2026
c0c5898
fix(it): automatically pick up the latest container version
bevzzz Feb 16, 2026
55ff367
ci(test): add v1.36.0-rc.0 to the testing matrix
bevzzz Feb 16, 2026
64d4998
chore(pom.xml): update dependencies
bevzzz Feb 18, 2026
f29a274
test(batch): add the '10_000 objects' integration test
bevzzz Feb 18, 2026
c5ea8bd
fix(batch): remove redundat parameter from Event.Started
bevzzz Feb 18, 2026
1a4645d
fix(batch): use API compatible w/ JDK 17
bevzzz Feb 18, 2026
270f726
fix(batch): add OPENED state for when the stream hasn't been started
bevzzz Feb 18, 2026
9f28126
fix(batch): replace of -> ofNullable
bevzzz Feb 18, 2026
101936b
fix(batch): create empty EnumSet via noneOf
bevzzz Feb 18, 2026
7c1e2bc
fix(batch): start the context before returning it
bevzzz Feb 18, 2026
d1d497d
fix(batch): handle happy path
bevzzz Feb 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ jobs:
strategy:
fail-fast: false
matrix:
WEAVIATE_VERSION: ["1.32.24", "1.33.11", "1.34.7", "1.35.2"]
WEAVIATE_VERSION:
["1.32.24", "1.33.11", "1.34.7", "1.35.2", "1.36.0-rc.0"]
steps:
- uses: actions/checkout@v4

Expand Down
14 changes: 7 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,20 @@
<lang3.version>3.20.0</lang3.version>
<junit.version>4.13.2</junit.version>
<testcontainers.version>2.0.3</testcontainers.version>
<assertj-core.version>3.27.6</assertj-core.version>
<assertj-core.version>3.27.7</assertj-core.version>
<jparams.version>1.0.4</jparams.version>
<mockito.version>5.21.0</mockito.version>
<slf4j.version>2.0.17</slf4j.version>
<logback.version>1.5.18</logback.version>
<mock-server.version>5.14.0</mock-server.version>
<jackson.version>2.21</jackson.version>
<oauth2-oidc-sdk.version>11.31.1</oauth2-oidc-sdk.version>
<oauth2-oidc-sdk.version>11.33</oauth2-oidc-sdk.version>
<mock-server.version>5.15.0</mock-server.version>
<protobuf.java.version>4.33.4</protobuf.java.version>
<protobuf.java-util.version>4.33.4</protobuf.java-util.version>
<grpc-netty-shaded.version>1.78.0</grpc-netty-shaded.version>
<grpc-protobuf.version>1.78.0</grpc-protobuf.version>
<grpc-stub.version>1.78.0</grpc-stub.version>
<protobuf.java.version>4.33.5</protobuf.java.version>
<protobuf.java-util.version>4.33.5</protobuf.java-util.version>
<grpc-netty-shaded.version>1.79.0</grpc-netty-shaded.version>
<grpc-protobuf.version>1.79.0</grpc-protobuf.version>
<grpc-stub.version>1.79.0</grpc-stub.version>
<annotations-api.version>6.0.53</annotations-api.version>
</properties>

Expand Down
17 changes: 15 additions & 2 deletions src/it/java/io/weaviate/containers/Weaviate.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

public class Weaviate extends WeaviateContainer {
public static final String DOCKER_IMAGE = "semitechnologies/weaviate";
public static final String LATEST_VERSION = Version.V135.semver.toString();
public static final String LATEST_VERSION = Version.latest().semver.toString();
public static final String VERSION;

static {
Expand All @@ -41,17 +41,30 @@ public enum Version {
V132(1, 32, 24),
V133(1, 33, 11),
V134(1, 34, 7),
V135(1, 35, 2);
V135(1, 35, 2),
V136(1, 36, "0-rc.0");

public final SemanticVersion semver;

private Version(int major, int minor, int patch) {
this.semver = new SemanticVersion(major, minor, patch);
}

private Version(int major, int minor, String patch) {
this.semver = new SemanticVersion(major, minor, patch);
}

public void orSkip() {
ConcurrentTest.requireAtLeast(this);
}

public static Version latest() {
Version[] versions = Version.class.getEnumConstants();
if (versions == null) {
throw new IllegalStateException("No versions are defined");
}
return versions[versions.length - 1];
}
}

/**
Expand Down
48 changes: 48 additions & 0 deletions src/it/java/io/weaviate/integration/BatchITest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.weaviate.integration;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;

import org.assertj.core.api.Assertions;
import org.junit.BeforeClass;
import org.junit.Test;

import io.weaviate.ConcurrentTest;
import io.weaviate.client6.v1.api.WeaviateClient;
import io.weaviate.client6.v1.api.collections.Property;
import io.weaviate.client6.v1.api.collections.WeaviateObject;
import io.weaviate.containers.Container;
import io.weaviate.containers.Weaviate;

public class BatchITest extends ConcurrentTest {
private static final WeaviateClient client = Container.WEAVIATE.getClient();

@BeforeClass
public static void __() {
Weaviate.Version.V136.orSkip();
}

@Test
public void test() throws IOException {
var nsThings = ns("Things");

var things = client.collections.create(
nsThings,
c -> c.properties(Property.text("letter")));

// Act
try (var batch = things.batch.start()) {
for (int i = 0; i < 10_000; i++) {
String uuid = UUID.randomUUID().toString();
batch.add(WeaviateObject.of(builder -> builder
.uuid(uuid)
.properties(Map.of("letter", uuid.substring(0, 1)))));
}
} catch (InterruptedException e) {
}

// Assert
Assertions.assertThat(things.size()).isEqualTo(10_000);
}
}
18 changes: 4 additions & 14 deletions src/main/java/io/weaviate/client6/v1/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.weaviate.client6.v1.internal.ObjectBuilder;
import io.weaviate.client6.v1.internal.Timeout;
import io.weaviate.client6.v1.internal.TokenProvider;
import io.weaviate.client6.v1.internal.TransportOptions;
import io.weaviate.client6.v1.internal.grpc.GrpcChannelOptions;
import io.weaviate.client6.v1.internal.rest.RestTransportOptions;

Expand Down Expand Up @@ -181,26 +182,15 @@ public SelfT timeout(int initSeconds, int querySeconds, int insertSeconds) {
private static final String HEADER_X_WEAVIATE_CLUSTER_URL = "X-Weaviate-Cluster-URL";
private static final String HEADER_X_WEAVIATE_CLIENT = "X-Weaviate-Client";

/**
* isWeaviateDomain returns true if the host matches weaviate.io,
* semi.technology, or weaviate.cloud domain.
*/
private static boolean isWeaviateDomain(String host) {
var lower = host.toLowerCase();
return lower.contains("weaviate.io") ||
lower.contains("semi.technology") ||
lower.contains("weaviate.cloud");
}

private static final String VERSION = "weaviate-client-java/"
+ ((!BuildInfo.TAGS.isBlank() && BuildInfo.TAGS != "unknown") ? BuildInfo.TAGS
: (BuildInfo.BRANCH + "-" + BuildInfo.COMMIT_ID_ABBREV));
+ ((!BuildInfo.TAGS.isBlank() && BuildInfo.TAGS != "unknown") ? BuildInfo.TAGS
: (BuildInfo.BRANCH + "-" + BuildInfo.COMMIT_ID_ABBREV));

@Override
public Config build() {
// For clusters hosted on Weaviate Cloud, Weaviate Embedding Service
// will be available under the same domain.
if (isWeaviateDomain(httpHost) && authentication != null) {
if (TransportOptions.isWeaviateDomain(httpHost) && authentication != null) {
setHeader(HEADER_X_WEAVIATE_CLUSTER_URL, "https://" + httpHost + ":" + httpPort);
}
setHeader(HEADER_X_WEAVIATE_CLIENT, VERSION);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.weaviate.client6.v1.api.collections;

import java.util.Collection;
import java.util.Optional;
import java.util.function.Function;

import io.weaviate.client6.v1.api.collections.aggregate.WeaviateAggregateClient;
import io.weaviate.client6.v1.api.collections.batch.WeaviateBatchClient;
import io.weaviate.client6.v1.api.collections.config.WeaviateConfigClient;
import io.weaviate.client6.v1.api.collections.data.WeaviateDataClient;
import io.weaviate.client6.v1.api.collections.generate.WeaviateGenerateClient;
Expand All @@ -23,6 +25,7 @@ public class CollectionHandle<PropertiesT> {
public final WeaviateAggregateClient aggregate;
public final WeaviateGenerateClient<PropertiesT> generate;
public final WeaviateTenantsClient tenants;
public final WeaviateBatchClient<PropertiesT> batch;

private final CollectionHandleDefaults defaults;

Expand All @@ -36,6 +39,7 @@ public CollectionHandle(
this.query = new WeaviateQueryClient<>(collection, grpcTransport, defaults);
this.generate = new WeaviateGenerateClient<>(collection, grpcTransport, defaults);
this.data = new WeaviateDataClient<>(collection, restTransport, grpcTransport, defaults);
this.batch = new WeaviateBatchClient<>(grpcTransport, collection, defaults);
this.defaults = defaults;

this.tenants = new WeaviateTenantsClient(collection, restTransport, grpcTransport);
Expand All @@ -48,6 +52,7 @@ private CollectionHandle(CollectionHandle<PropertiesT> c, CollectionHandleDefaul
this.query = new WeaviateQueryClient<>(c.query, defaults);
this.generate = new WeaviateGenerateClient<>(c.generate, defaults);
this.data = new WeaviateDataClient<>(c.data, defaults);
this.batch = new WeaviateBatchClient<>(c.batch, defaults);
this.defaults = defaults;

this.tenants = c.tenants;
Expand Down Expand Up @@ -112,7 +117,7 @@ public long size() {
}

/** Default consistency level for requests. */
public ConsistencyLevel consistencyLevel() {
public Optional<ConsistencyLevel> consistencyLevel() {
return defaults.consistencyLevel();
}

Expand All @@ -122,7 +127,7 @@ public CollectionHandle<PropertiesT> withConsistencyLevel(ConsistencyLevel consi
}

/** Default tenant for requests. */
public String tenant() {
public Optional<String> tenant() {
return defaults.tenant();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.weaviate.client6.v1.api.collections;

import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

Expand Down Expand Up @@ -90,7 +91,7 @@ public CompletableFuture<Long> size() {
}

/** Default consistency level for requests. */
public ConsistencyLevel consistencyLevel() {
public Optional<ConsistencyLevel> consistencyLevel() {
return defaults.consistencyLevel();
}

Expand All @@ -101,7 +102,7 @@ public CollectionHandleAsync<PropertiesT> withConsistencyLevel(ConsistencyLevel
}

/** Default tenant for requests. */
public String tenant() {
public Optional<String> tenant() {
return defaults.tenant();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package io.weaviate.client6.v1.api.collections;

import static java.util.Objects.requireNonNull;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import io.weaviate.client6.v1.api.collections.query.ConsistencyLevel;
import io.weaviate.client6.v1.internal.ObjectBuilder;

public record CollectionHandleDefaults(ConsistencyLevel consistencyLevel, String tenant) {
public record CollectionHandleDefaults(Optional<ConsistencyLevel> consistencyLevel, Optional<String> tenant) {
/**
* Set default values for query / aggregation requests.
*
Expand All @@ -28,8 +31,12 @@ public static Function<Builder, ObjectBuilder<CollectionHandleDefaults>> none()
return ObjectBuilder.identity();
}

public CollectionHandleDefaults {
requireNonNull(consistencyLevel, "consistencyLevel is null");
}

public CollectionHandleDefaults(Builder builder) {
this(builder.consistencyLevel, builder.tenant);
this(Optional.ofNullable(builder.consistencyLevel), Optional.ofNullable(builder.tenant));
}

public static final class Builder implements ObjectBuilder<CollectionHandleDefaults> {
Expand All @@ -56,16 +63,12 @@ public CollectionHandleDefaults build() {

/** Serialize default values to a URL query. */
public Map<String, Object> queryParameters() {
if (consistencyLevel == null && tenant == null) {
if (consistencyLevel.isEmpty() && tenant.isEmpty()) {
return Collections.emptyMap();
}
var query = new HashMap<String, Object>();
if (consistencyLevel != null) {
query.put("consistency_level", consistencyLevel);
}
if (tenant != null) {
query.put("tenant", tenant);
}
Map<String, Object> query = new HashMap<String, Object>();
consistencyLevel.ifPresent(v -> query.put("consistency_level", v));
tenant.ifPresent(v -> query.put("tenant", v));
return query;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ static <T> Rpc<AggregateRequest, WeaviateProtoAggregate.AggregateRequest, Aggreg
if (request.groupBy != null) {
request.groupBy.appendTo(message, collection.collectionName());
}
if (defaults.tenant() != null) {
message.setTenant(defaults.tenant());
}
defaults.tenant().ifPresent(message::setTenant);
return message.build();
},
reply -> {
Expand Down
Loading
Loading