Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ jobs:
strategy:
fail-fast: false
matrix:
WEAVIATE_VERSION: ["1.32.24", "1.33.11", "1.34.7", "1.35.2"]
WEAVIATE_VERSION:
["1.32.24", "1.33.11", "1.34.7", "1.35.2", "1.36.0-rc.0"]
steps:
- uses: actions/checkout@v4

Expand Down
17 changes: 15 additions & 2 deletions src/it/java/io/weaviate/containers/Weaviate.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

public class Weaviate extends WeaviateContainer {
public static final String DOCKER_IMAGE = "semitechnologies/weaviate";
public static final String LATEST_VERSION = Version.V135.semver.toString();
public static final String LATEST_VERSION = Version.latest().semver.toString();
public static final String VERSION;

static {
Expand All @@ -41,17 +41,30 @@ public enum Version {
V132(1, 32, 24),
V133(1, 33, 11),
V134(1, 34, 7),
V135(1, 35, 2);
V135(1, 35, 2),
V136(1, 36, "0-rc.0");

public final SemanticVersion semver;

private Version(int major, int minor, int patch) {
this.semver = new SemanticVersion(major, minor, patch);
}

private Version(int major, int minor, String patch) {
this.semver = new SemanticVersion(major, minor, patch);
}

public void orSkip() {
ConcurrentTest.requireAtLeast(this);
}

public static Version latest() {
Version[] versions = Version.class.getEnumConstants();
if (versions == null) {
throw new IllegalStateException("No versions are defined");
}
return versions[versions.length - 1];
}
}

/**
Expand Down
50 changes: 50 additions & 0 deletions src/it/java/io/weaviate/integration/CollectionsITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.weaviate.client6.v1.api.collections.Quantization;
import io.weaviate.client6.v1.api.collections.ReferenceProperty;
import io.weaviate.client6.v1.api.collections.Replication;
import io.weaviate.client6.v1.api.collections.Replication.AsyncReplicationConfig;
import io.weaviate.client6.v1.api.collections.VectorConfig;
import io.weaviate.client6.v1.api.collections.config.Shard;
import io.weaviate.client6.v1.api.collections.config.ShardStatus;
Expand Down Expand Up @@ -328,4 +329,53 @@ public void test_objectTtl() throws IOException {
.extracting(CollectionConfig::objectTtl).isNotNull()
.returns(false, ObjectTtl::enabled);
}

@Test
public void test_asyncReplicationConfig() throws IOException {
Weaviate.Version.latest().orSkip();

// Arrange
var nsThings = ns("Things");

// Act
var things = client.collections.create(nsThings,
c -> c.replication(Replication.of(
repl -> repl
.asyncEnabled(true)
.asyncReplication(AsyncReplicationConfig.of(
async -> async
.hashTreeHeight(1)
.replicationConcurrency(2)
.replicationFrequencyMillis(3)
.replicationFrequencyMillisWhilePropagating(4)
.nodePingFrequencyMillis(5)
.loggingFrequencyMillis(6)
.diffBatchSize(7)
.diffPerNodeTimeoutSeconds(8)
.prePropagationTimeoutSeconds(9)
.propagationTimeoutSeconds(10)
.propagationDelayMillis(11)
.propagationLimit(12)
.propagationConcurrency(13)
.propagationBatchSize(14))))));

// Assert
Assertions.assertThat(things.config.get()).get()
.extracting(CollectionConfig::replication)
.extracting(Replication::asyncReplicationConfig)
.returns(1, AsyncReplicationConfig::hashTreeHeight)
.returns(2, AsyncReplicationConfig::replicationConcurrency)
.returns(3, AsyncReplicationConfig::replicationFrequencyMillis)
.returns(4, AsyncReplicationConfig::replicationFrequencyMillisWhilePropagating)
.returns(5, AsyncReplicationConfig::nodePingFrequencyMillis)
.returns(6, AsyncReplicationConfig::loggingFrequencyMillis)
.returns(7, AsyncReplicationConfig::diffBatchSize)
.returns(8, AsyncReplicationConfig::diffPerNodeTimeoutSeconds)
.returns(9, AsyncReplicationConfig::prePropagationTimeoutSeconds)
.returns(10, AsyncReplicationConfig::propagationTimeoutSeconds)
.returns(11, AsyncReplicationConfig::propagationDelayMillis)
.returns(12, AsyncReplicationConfig::propagationLimit)
.returns(13, AsyncReplicationConfig::propagationConcurrency)
.returns(14, AsyncReplicationConfig::propagationBatchSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Bm25 build() {
}
}

public record Stopwords(
public static record Stopwords(
/** Selected preset. */
@SerializedName("preset") String preset,
/** Custom words added to the selected preset. */
Expand Down
166 changes: 164 additions & 2 deletions src/main/java/io/weaviate/client6/v1/api/collections/Replication.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
public record Replication(
@SerializedName("factor") Integer replicationFactor,
@SerializedName("asyncEnabled") Boolean asyncEnabled,
@SerializedName("deletionStrategy") DeletionStrategy deletionStrategy) {
@SerializedName("deletionStrategy") DeletionStrategy deletionStrategy,
@SerializedName("asyncConfig") AsyncReplicationConfig asyncReplicationConfig) {

public static enum DeletionStrategy {
@SerializedName("NoAutomatedResolution")
Expand All @@ -28,13 +29,168 @@ public Replication(Builder builder) {
this(
builder.replicationFactor,
builder.asyncEnabled,
builder.deletionStrategy);
builder.deletionStrategy,
builder.asyncReplicationConfig);
}

public static record AsyncReplicationConfig(
@SerializedName("hashtreeHeight") Integer hashTreeHeight,
@SerializedName("maxWorkers") Integer replicationConcurrency,
@SerializedName("frequency") Integer replicationFrequencyMillis,
@SerializedName("frequencyWhilePropagating") Integer replicationFrequencyMillisWhilePropagating,
@SerializedName("aliveNodesCheckingFrequency") Integer nodePingFrequencyMillis,
@SerializedName("loggingFrequency") Integer loggingFrequencyMillis,
@SerializedName("diffBatchSize") Integer diffBatchSize,
@SerializedName("diffPerNodeTimeout") Integer diffPerNodeTimeoutSeconds,
@SerializedName("prePropagationTimeout") Integer prePropagationTimeoutSeconds,
@SerializedName("propagationTimeout") Integer propagationTimeoutSeconds,
@SerializedName("propagationDelay") Integer propagationDelayMillis,
@SerializedName("propagationLimit") Integer propagationLimit,
@SerializedName("propagationConcurrency") Integer propagationConcurrency,
@SerializedName("propagationBatchSize") Integer propagationBatchSize) {

public AsyncReplicationConfig(Builder builder) {
this(
builder.hashTreeHeight,
builder.replicationConcurrency,
builder.replicationFrequencyMillis,
builder.replicationFrequencyMillisWhilePropagating,
builder.nodePingFrequencyMillis,
builder.loggingFrequencyMillis,
builder.diffBatchSize,
builder.diffPerNodeTimeoutSeconds,
builder.prePropagationTimeoutSeconds,
builder.propagationTimeoutSeconds,
builder.propagationDelayMillis,
builder.propagationLimit,
builder.propagationConcurrency,
builder.propagationBatchSize);
}

public static AsyncReplicationConfig of(Function<Builder, ObjectBuilder<AsyncReplicationConfig>> fn) {
return fn.apply(new Builder()).build();
}

public static class Builder implements ObjectBuilder<AsyncReplicationConfig> {
private Integer hashTreeHeight;
private Integer replicationConcurrency;
private Integer replicationFrequencyMillis;
private Integer replicationFrequencyMillisWhilePropagating;
private Integer nodePingFrequencyMillis;
private Integer loggingFrequencyMillis;
private Integer diffBatchSize;
private Integer diffPerNodeTimeoutSeconds;
private Integer prePropagationTimeoutSeconds;
private Integer propagationTimeoutSeconds;
private Integer propagationDelayMillis;
private Integer propagationLimit;
private Integer propagationConcurrency;
private Integer propagationBatchSize;

/** Height of the hashtree used for diffing. */
public Builder hashTreeHeight(int hashTreeHeight) {
this.hashTreeHeight = hashTreeHeight;
return this;
}

/** Maximum number of async replication workers. */
public Builder replicationConcurrency(int replicationConcurrency) {
this.replicationConcurrency = replicationConcurrency;
return this;
}

/**
* Base frequency in milliseconds at which async replication
* runs diff calculations.
*/
public Builder replicationFrequencyMillis(int replicationFrequencyMillis) {
this.replicationFrequencyMillis = replicationFrequencyMillis;
return this;
}

/**
* Frequency in milliseconds at which async replication runs
* while propagation is active.
*/
public Builder replicationFrequencyMillisWhilePropagating(int replicationFrequencyMillisWhilePropagating) {
this.replicationFrequencyMillisWhilePropagating = replicationFrequencyMillisWhilePropagating;
return this;
}

/** Interval in milliseconds at which liveness of target nodes is checked." */
public Builder nodePingFrequencyMillis(int nodePingFrequencyMillis) {
this.nodePingFrequencyMillis = nodePingFrequencyMillis;
return this;
}

/** Interval in seconds at which async replication logs its status. */
public Builder loggingFrequencyMillis(int loggingFrequencyMillis) {
this.loggingFrequencyMillis = loggingFrequencyMillis;
return this;
}

/** Maximum number of object keys included in a single diff batch. */
public Builder diffBatchSize(int diffBatchSize) {
this.diffBatchSize = diffBatchSize;
return this;
}

/** Timeout in seconds for computing a diff against a single node. */
public Builder diffPerNodeTimeoutSeconds(int diffPerNodeTimeoutSeconds) {
this.diffPerNodeTimeoutSeconds = diffPerNodeTimeoutSeconds;
return this;
}

/** Overall timeout in seconds for the pre-propagation phase. */
public Builder prePropagationTimeoutSeconds(int prePropagationTimeoutSeconds) {
this.prePropagationTimeoutSeconds = prePropagationTimeoutSeconds;
return this;
}

/** Timeout in seconds for propagating batch of changes to a node. */
public Builder propagationTimeoutSeconds(int propagationTimeoutSeconds) {
this.propagationTimeoutSeconds = propagationTimeoutSeconds;
return this;
}

/**
* Delay in milliseconds before newly added or updated objects are propagated.
*/
public Builder propagationDelayMillis(int propagationDelayMillis) {
this.propagationDelayMillis = propagationDelayMillis;
return this;
}

/** Maximum number of objects to propagate in a single async replication run. */
public Builder propagationLimit(int propagationLimit) {
this.propagationLimit = propagationLimit;
return this;
}

/** Maximum number of concurrent propagation workers. */
public Builder propagationConcurrency(int propagationConcurrency) {
this.propagationConcurrency = propagationConcurrency;
return this;
}

/** Maximum number of objects to propagate in a single async replication run. */
public Builder propagationBatchSize(int propagationBatchSize) {
this.propagationBatchSize = propagationBatchSize;
return this;
}

@Override
public AsyncReplicationConfig build() {
return new AsyncReplicationConfig(this);
}
}
}

public static class Builder implements ObjectBuilder<Replication> {
private Integer replicationFactor;
private Boolean asyncEnabled;
private DeletionStrategy deletionStrategy;
private AsyncReplicationConfig asyncReplicationConfig;

/** Set desired replication factor for this collection. */
public Builder replicationFactor(int replicationFactor) {
Expand All @@ -57,6 +213,12 @@ public Builder deletionStrategy(DeletionStrategy deletionStrategy) {
return this;
}

/** Configuration parameters for asynchronous replication. */
public Builder asyncReplication(AsyncReplicationConfig asyncReplicationConfig) {
this.asyncReplicationConfig = asyncReplicationConfig;
return this;
}

@Override
public Replication build() {
return new Replication(this);
Expand Down
Loading