Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,6 @@ public void testDropRoleIdempotent() throws Exception {
Collections.singleton("2,"));
}

// Table model

private void testIdempotent(
final List<String> beforeSqlList,
final String testSql,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ private TSStatus executeStatementAndClassifyExceptions(
receiverId.get(),
statement.getPipeLoggingString(),
result);
return statement.accept(STATEMENT_STATUS_VISITOR, result);
return STATEMENT_STATUS_VISITOR.process(statement, result);
}
} catch (final Exception e) {
PipeLogger.log(
Expand All @@ -849,7 +849,7 @@ private TSStatus executeStatementAndClassifyExceptions(
"Receiver id = %s: Exception encountered while executing statement %s: ",
receiverId.get(),
statement.getPipeLoggingString());
return statement.accept(STATEMENT_EXCEPTION_VISITOR, e);
return STATEMENT_EXCEPTION_VISITOR.process(statement, e);
} finally {
if (Objects.nonNull(allocatedMemoryBlock)) {
allocatedMemoryBlock.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.receiver.visitor;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
Expand All @@ -43,6 +44,8 @@
import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
import org.apache.iotdb.rpc.TSStatusCode;

import java.util.stream.Collectors;

/**
* This visitor translated some {@link TSStatus} to pipe related status to help sender classify them
* and apply different error handling tactics. Please DO NOT modify the {@link TSStatus} returned by
Expand All @@ -53,227 +56,215 @@ public class PipeStatementTSStatusVisitor extends StatementVisitor<TSStatus, TSS
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

@Override
public TSStatus visitNode(final StatementNode node, final TSStatus context) {
return context;
public TSStatus process(final StatementNode node, final TSStatus status) {
return status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()
? PipeReceiverStatusHandler.getPriorStatus(
status.getSubStatus().stream()
.map(subStatus -> node.accept(this, subStatus))
.collect(Collectors.toList()))
: node.accept(this, status);
}

@Override
public TSStatus visitNode(final StatementNode node, final TSStatus status) {
return status;
}

@Override
public TSStatus visitLoadFile(
final LoadTsFileStatement loadTsFileStatement, final TSStatus context) {
if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
|| context.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
&& context.getMessage() != null
&& context.getMessage().contains("memory")) {
final LoadTsFileStatement loadTsFileStatement, final TSStatus status) {
if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
|| status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode()
&& status.getMessage() != null
&& status.getMessage().contains("memory")) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
.setMessage(status.getMessage());
}
return super.visitLoadFile(loadTsFileStatement, context);
return super.visitLoadFile(loadTsFileStatement, status);
}

@Override
public TSStatus visitInsertTablet(
final InsertTabletStatement insertTabletStatement, final TSStatus context) {
return visitInsertBase(insertTabletStatement, context);
final InsertTabletStatement insertTabletStatement, final TSStatus status) {
return visitInsertBase(insertTabletStatement, status);
}

@Override
public TSStatus visitInsertRow(
final InsertRowStatement insertRowStatement, final TSStatus context) {
return visitInsertBase(insertRowStatement, context);
final InsertRowStatement insertRowStatement, final TSStatus status) {
return visitInsertBase(insertRowStatement, status);
}

@Override
public TSStatus visitInsertRows(
final InsertRowsStatement insertRowsStatement, final TSStatus context) {
return visitInsertBase(insertRowsStatement, context);
final InsertRowsStatement insertRowsStatement, final TSStatus status) {
return visitInsertBase(insertRowsStatement, status);
}

@Override
public TSStatus visitInsertMultiTablets(
final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus context) {
return visitInsertBase(insertMultiTabletsStatement, context);
final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus status) {
return visitInsertBase(insertMultiTabletsStatement, status);
}

@Override
public TSStatus visitInsertBase(
final InsertBaseStatement insertBaseStatement, final TSStatus context) {
final InsertBaseStatement insertBaseStatement, final TSStatus status) {
// If the system is read-only, we shall not classify it into temporary unavailable exception to
// avoid to many logs
if (context.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
if (status.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
.setMessage(status.getMessage());
} else if (status.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
.setMessage(status.getMessage());
} else if (status.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
if (context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
.setMessage(status.getMessage());
} else if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
if (status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING)
&& config.isEnablePartialInsert()) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
.setMessage(status.getMessage());
}
if (context.getMessage().contains("does not exist")) {
if (status.getMessage().contains("does not exist")) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
.setMessage(status.getMessage());
}
}
return visitStatement(insertBaseStatement, context);
return visitStatement(insertBaseStatement, status);
}

@Override
public TSStatus visitCreateTimeseries(
final CreateTimeSeriesStatement statement, final TSStatus context) {
return visitGeneralCreateTimeSeries(statement, context);
final CreateTimeSeriesStatement statement, final TSStatus status) {
return visitGeneralCreateTimeSeries(statement, status);
}

@Override
public TSStatus visitCreateAlignedTimeseries(
final CreateAlignedTimeSeriesStatement statement, final TSStatus context) {
return visitGeneralCreateTimeSeries(statement, context);
final CreateAlignedTimeSeriesStatement statement, final TSStatus status) {
return visitGeneralCreateTimeSeries(statement, status);
}

private TSStatus visitGeneralCreateTimeSeries(final Statement statement, final TSStatus context) {
if (context.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
|| context.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
private TSStatus visitGeneralCreateTimeSeries(final Statement statement, final TSStatus status) {
if (status.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
|| status.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
|| context.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
.setMessage(status.getMessage());
} else if (status.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
|| status.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
.setMessage(status.getMessage());
}
return visitStatement(statement, context);
return visitStatement(statement, status);
}

@Override
public TSStatus visitCreateMultiTimeSeries(
final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, final TSStatus context) {
return visitGeneralCreateMultiTimeseries(createMultiTimeSeriesStatement, context);
final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, final TSStatus status) {
return visitGeneralCreateMultiTimeSeries(createMultiTimeSeriesStatement, status);
}

@Override
public TSStatus visitInternalCreateTimeseries(
final InternalCreateTimeSeriesStatement internalCreateTimeSeriesStatement,
final TSStatus context) {
return visitGeneralCreateMultiTimeseries(internalCreateTimeSeriesStatement, context);
final TSStatus status) {
return visitGeneralCreateMultiTimeSeries(internalCreateTimeSeriesStatement, status);
}

@Override
public TSStatus visitInternalCreateMultiTimeSeries(
final InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement,
final TSStatus context) {
return visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement, context);
final TSStatus status) {
return visitGeneralCreateMultiTimeSeries(internalCreateMultiTimeSeriesStatement, status);
}

private TSStatus visitGeneralCreateMultiTimeseries(
final Statement statement, final TSStatus context) {
if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
for (final TSStatus status : context.getSubStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
&& status.getCode() != TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
return visitStatement(statement, context);
}
}
private TSStatus visitGeneralCreateMultiTimeSeries(
final Statement statement, final TSStatus status) {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| status.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()
|| status.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
.setMessage(status.getMessage());
} else if (status.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
.setMessage(status.getMessage());
}
return visitStatement(statement, context);
return visitStatement(statement, status);
}

@Override
public TSStatus visitAlterTimeSeries(
final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus context) {
if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
if (context.getMessage().contains("already")) {
final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus status) {
if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) {
if (status.getMessage().contains("already")) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getMessage().contains("does not")) {
.setMessage(status.getMessage());
} else if (status.getMessage().contains("does not")) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
.setMessage(status.getMessage());
}
} else if (context.getCode() == TSStatusCode.PATH_NOT_EXIST.getStatusCode()) {
} else if (status.getCode() == TSStatusCode.PATH_NOT_EXIST.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
.setMessage(status.getMessage());
}
return visitStatement(alterTimeSeriesStatement, context);
return visitStatement(alterTimeSeriesStatement, status);
}

@Override
public TSStatus visitCreateLogicalView(
final CreateLogicalViewStatement createLogicalViewStatement, final TSStatus context) {
if (context.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
for (final TSStatus status : context.getSubStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
return visitStatement(createLogicalViewStatement, context);
}
}
final CreateLogicalViewStatement createLogicalViewStatement, final TSStatus status) {
if (status.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
.setMessage(status.getMessage());
}
return super.visitCreateLogicalView(createLogicalViewStatement, context);
return super.visitCreateLogicalView(createLogicalViewStatement, status);
}

@Override
public TSStatus visitActivateTemplate(
final ActivateTemplateStatement activateTemplateStatement, final TSStatus context) {
return visitGeneralActivateTemplate(activateTemplateStatement, context);
final ActivateTemplateStatement activateTemplateStatement, final TSStatus status) {
return visitGeneralActivateTemplate(activateTemplateStatement, status);
}

@Override
public TSStatus visitBatchActivateTemplate(
final BatchActivateTemplateStatement batchActivateTemplateStatement, final TSStatus context) {
boolean userConflict = false;
if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
for (final TSStatus status : context.getSubStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
return visitStatement(batchActivateTemplateStatement, context);
}
if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
&& context.isSetMessage()
&& context.getMessage().contains("has not been set any template")) {
userConflict = true;
}
}
return (userConflict
? new TSStatus(
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
: new TSStatus(
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()))
.setMessage(context.getMessage());
final BatchActivateTemplateStatement batchActivateTemplateStatement, final TSStatus status) {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(status.getMessage());
}
if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
&& status.isSetMessage()
&& status.getMessage().contains("has not been set any template")) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(status.getMessage());
}
return visitGeneralActivateTemplate(batchActivateTemplateStatement, context);
return visitGeneralActivateTemplate(batchActivateTemplateStatement, status);
}

private TSStatus visitGeneralActivateTemplate(
final Statement activateTemplateStatement, final TSStatus context) {
if (context.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
final Statement activateTemplateStatement, final TSStatus status) {
if (status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
.setMessage(status.getMessage());
}
if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
&& context.isSetMessage()
&& context.getMessage().contains("has not been set any template")) {
if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()
&& status.isSetMessage()
&& status.getMessage().contains("has not been set any template")) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
.setMessage(status.getMessage());
}
return visitStatement(activateTemplateStatement, context);
return visitStatement(activateTemplateStatement, status);
}
}
Loading