diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java index 53da5d3f498ad..fb499ae5e43a5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java @@ -400,8 +400,6 @@ public void testDropRoleIdempotent() throws Exception { Collections.singleton("2,")); } - // Table model - private void testIdempotent( final List beforeSqlList, final String testSql, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index f0796a3b1a038..f161a2496669b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -840,7 +840,7 @@ private TSStatus executeStatementAndClassifyExceptions( receiverId.get(), statement.getPipeLoggingString(), result); - return statement.accept(STATEMENT_STATUS_VISITOR, result); + return STATEMENT_STATUS_VISITOR.process(statement, result); } } catch (final Exception e) { PipeLogger.log( @@ -849,7 +849,7 @@ private TSStatus executeStatementAndClassifyExceptions( "Receiver id = %s: Exception encountered while executing statement %s: ", receiverId.get(), statement.getPipeLoggingString()); - return statement.accept(STATEMENT_EXCEPTION_VISITOR, e); + return STATEMENT_EXCEPTION_VISITOR.process(statement, e); } finally { if (Objects.nonNull(allocatedMemoryBlock)) { allocatedMemoryBlock.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java index 7d1b08b123894..49b9b9a87c77c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.receiver.visitor; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; @@ -43,6 +44,8 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement; import org.apache.iotdb.rpc.TSStatusCode; +import java.util.stream.Collectors; + /** * This visitor translated some {@link TSStatus} to pipe related status to help sender classify them * and apply different error handling tactics. Please DO NOT modify the {@link TSStatus} returned by @@ -53,227 +56,215 @@ public class PipeStatementTSStatusVisitor extends StatementVisitor node.accept(this, subStatus)) + .collect(Collectors.toList())) + : node.accept(this, status); + } + + @Override + public TSStatus visitNode(final StatementNode node, final TSStatus status) { + return status; } @Override public TSStatus visitLoadFile( - final LoadTsFileStatement loadTsFileStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() - || context.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode() - && context.getMessage() != null - && context.getMessage().contains("memory")) { + final LoadTsFileStatement loadTsFileStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() + || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode() + && status.getMessage() != null + && status.getMessage().contains("memory")) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return super.visitLoadFile(loadTsFileStatement, context); + return super.visitLoadFile(loadTsFileStatement, status); } @Override public TSStatus visitInsertTablet( - final InsertTabletStatement insertTabletStatement, final TSStatus context) { - return visitInsertBase(insertTabletStatement, context); + final InsertTabletStatement insertTabletStatement, final TSStatus status) { + return visitInsertBase(insertTabletStatement, status); } @Override public TSStatus visitInsertRow( - final InsertRowStatement insertRowStatement, final TSStatus context) { - return visitInsertBase(insertRowStatement, context); + final InsertRowStatement insertRowStatement, final TSStatus status) { + return visitInsertBase(insertRowStatement, status); } @Override public TSStatus visitInsertRows( - final InsertRowsStatement insertRowsStatement, final TSStatus context) { - return visitInsertBase(insertRowsStatement, context); + final InsertRowsStatement insertRowsStatement, final TSStatus status) { + return visitInsertBase(insertRowsStatement, status); } @Override public TSStatus visitInsertMultiTablets( - final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus context) { - return visitInsertBase(insertMultiTabletsStatement, context); + final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus status) { + return visitInsertBase(insertMultiTabletsStatement, status); } @Override public TSStatus visitInsertBase( - final InsertBaseStatement insertBaseStatement, final TSStatus context) { + final InsertBaseStatement insertBaseStatement, final TSStatus status) { // If the system is read-only, we shall not classify it into temporary unavailable exception to // avoid to many logs - if (context.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { + if (status.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) { + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) { + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { - if (context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING) + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { + if (status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING) && config.isEnablePartialInsert()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - if (context.getMessage().contains("does not exist")) { + if (status.getMessage().contains("does not exist")) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } } - return visitStatement(insertBaseStatement, context); + return visitStatement(insertBaseStatement, status); } @Override public TSStatus visitCreateTimeseries( - final CreateTimeSeriesStatement statement, final TSStatus context) { - return visitGeneralCreateTimeSeries(statement, context); + final CreateTimeSeriesStatement statement, final TSStatus status) { + return visitGeneralCreateTimeSeries(statement, status); } @Override public TSStatus visitCreateAlignedTimeseries( - final CreateAlignedTimeSeriesStatement statement, final TSStatus context) { - return visitGeneralCreateTimeSeries(statement, context); + final CreateAlignedTimeSeriesStatement statement, final TSStatus status) { + return visitGeneralCreateTimeSeries(statement, status); } - private TSStatus visitGeneralCreateTimeSeries(final Statement statement, final TSStatus context) { - if (context.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode() - || context.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { + private TSStatus visitGeneralCreateTimeSeries(final Statement statement, final TSStatus status) { + if (status.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode() + || status.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode() - || context.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) { + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode() + || status.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return visitStatement(statement, context); + return visitStatement(statement, status); } @Override public TSStatus visitCreateMultiTimeSeries( - final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, final TSStatus context) { - return visitGeneralCreateMultiTimeseries(createMultiTimeSeriesStatement, context); + final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, final TSStatus status) { + return visitGeneralCreateMultiTimeSeries(createMultiTimeSeriesStatement, status); } @Override public TSStatus visitInternalCreateTimeseries( final InternalCreateTimeSeriesStatement internalCreateTimeSeriesStatement, - final TSStatus context) { - return visitGeneralCreateMultiTimeseries(internalCreateTimeSeriesStatement, context); + final TSStatus status) { + return visitGeneralCreateMultiTimeSeries(internalCreateTimeSeriesStatement, status); } @Override public TSStatus visitInternalCreateMultiTimeSeries( final InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement, - final TSStatus context) { - return visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement, context); + final TSStatus status) { + return visitGeneralCreateMultiTimeSeries(internalCreateMultiTimeSeriesStatement, status); } - private TSStatus visitGeneralCreateMultiTimeseries( - final Statement statement, final TSStatus context) { - if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - for (final TSStatus status : context.getSubStatus()) { - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && status.getCode() != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode() - && status.getCode() != TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { - return visitStatement(statement, context); - } - } + private TSStatus visitGeneralCreateMultiTimeSeries( + final Statement statement, final TSStatus status) { + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || status.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode() + || status.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) { + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return visitStatement(statement, context); + return visitStatement(statement, status); } @Override public TSStatus visitAlterTimeSeries( - final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { - if (context.getMessage().contains("already")) { + final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { + if (status.getMessage().contains("already")) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getMessage().contains("does not")) { + .setMessage(status.getMessage()); + } else if (status.getMessage().contains("does not")) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - } else if (context.getCode() == TSStatusCode.PATH_NOT_EXIST.getStatusCode()) { + } else if (status.getCode() == TSStatusCode.PATH_NOT_EXIST.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return visitStatement(alterTimeSeriesStatement, context); + return visitStatement(alterTimeSeriesStatement, status); } @Override public TSStatus visitCreateLogicalView( - final CreateLogicalViewStatement createLogicalViewStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - for (final TSStatus status : context.getSubStatus()) { - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && status.getCode() != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - return visitStatement(createLogicalViewStatement, context); - } - } + final CreateLogicalViewStatement createLogicalViewStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return super.visitCreateLogicalView(createLogicalViewStatement, context); + return super.visitCreateLogicalView(createLogicalViewStatement, status); } @Override public TSStatus visitActivateTemplate( - final ActivateTemplateStatement activateTemplateStatement, final TSStatus context) { - return visitGeneralActivateTemplate(activateTemplateStatement, context); + final ActivateTemplateStatement activateTemplateStatement, final TSStatus status) { + return visitGeneralActivateTemplate(activateTemplateStatement, status); } @Override public TSStatus visitBatchActivateTemplate( - final BatchActivateTemplateStatement batchActivateTemplateStatement, final TSStatus context) { - boolean userConflict = false; - if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - for (final TSStatus status : context.getSubStatus()) { - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { - return visitStatement(batchActivateTemplateStatement, context); - } - if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() - && context.isSetMessage() - && context.getMessage().contains("has not been set any template")) { - userConflict = true; - } - } - return (userConflict - ? new TSStatus( - TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) - : new TSStatus( - TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) - .setMessage(context.getMessage()); + final BatchActivateTemplateStatement batchActivateTemplateStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { + return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); + } + if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + && status.isSetMessage() + && status.getMessage().contains("has not been set any template")) { + return new TSStatus( + TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); } - return visitGeneralActivateTemplate(batchActivateTemplateStatement, context); + return visitGeneralActivateTemplate(batchActivateTemplateStatement, status); } private TSStatus visitGeneralActivateTemplate( - final Statement activateTemplateStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { + final Statement activateTemplateStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() - && context.isSetMessage() - && context.getMessage().contains("has not been set any template")) { + if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + && status.isSetMessage() + && status.getMessage().contains("has not been set any template")) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return visitStatement(activateTemplateStatement, context); + return visitStatement(activateTemplateStatement, status); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java index d19aa71c23478..10d0423e6fe93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java @@ -154,9 +154,8 @@ public Optional visitLoadFile( TSStatus result; try { result = - statement.accept( - IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement, databaseName)); + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process( + statement, statementExecutor.execute(statement, databaseName)); // Retry max 5 times if the write process is rejected for (int i = 0; @@ -167,15 +166,14 @@ public Optional visitLoadFile( i++) { Thread.sleep(100L * (i + 1)); result = - statement.accept( - IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement, databaseName)); + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process( + statement, statementExecutor.execute(statement, databaseName)); } } catch (final Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } - result = statement.accept(IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR, e); + result = IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR.process(statement, e); } if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java index 282b378a2d268..ae60b87450aa8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java @@ -114,9 +114,8 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, tr TSStatus result; try { result = - statement.accept( - IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement)); + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process( + statement, statementExecutor.execute(statement)); // Retry max 5 times if the write process is rejected for (int i = 0; @@ -127,9 +126,8 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, tr i++) { Thread.sleep(100L * (i + 1)); result = - statement.accept( - IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement)); + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process( + statement, statementExecutor.execute(statement)); } } catch (final Exception e) { if (e instanceof InterruptedException) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java new file mode 100644 index 0000000000000..81715716a495b --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.utils.StatusUtils; +import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class PipeStatementTsStatusVisitorTest { + + @Test + public void testTTLIdempotency() { + Assert.assertEquals( + TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode(), + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR + .process( + new InsertRowsStatement(), + new TSStatus(TSStatusCode.MULTIPLE_ERROR.getStatusCode()) + .setSubStatus( + Arrays.asList( + StatusUtils.OK, new TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode())))) + .getCode()); + } +}