Skip to content
  •  
  •  
  •  
4 changes: 4 additions & 0 deletions external-service-impl/mqtt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@
</ignoredDependencies>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.mqtt.i18n;

public final class MqttMessages {

// --- LinePayloadFormatter ---
public static final String INVALID_LINE_PROTOCOL = "Invalid line protocol format ,line is {}";
public static final String TAGS_ERROR = "The tags is error , line is {}";
public static final String ATTRIBUTES_ERROR = "The attributes is error , line is {}";
public static final String FIELDS_ERROR = "The fields is error , line is {}";
public static final String TIMESTAMP_ERROR = "The timestamp is error , line is {}";

// --- MPPPublishHandler ---
public static final String ON_PUBLISH_EXCEPTION =
"onPublish execution exception, msg is [{}], error is ";
public static final String PROCESS_RESULT = "process result: {}";

// --- MQTTService ---
public static final String SERVER_START_EXCEPTION = "Exception while starting server";
public static final String STOPPING_MQTT_SERVICE = "Stopping IoTDB MQTT service...";
public static final String MQTT_SERVICE_STOPPED = "IoTDB MQTT service stopped.";

// --- PayloadFormatManager ---
public static final String MQTT_DIR = "mqttDir: {}";
public static final String PAYLOAD_FORMAT_MANAGER_INIT_ERROR =
"MQTT PayloadFormatManager init() error.";
public static final String FORMATTER_IS_NULL = "PayloadFormatManager(), formatter is null.";
public static final String FIND_MQTT_PLUGIN =
"PayloadFormatManager(), find MQTT Payload Plugin {}.";
public static final String MQTT_PLUGIN_JAR_URLS = "MQTT Plugin jarURLs: {}";

// --- JSONPayloadFormatter ---
public static final String PAYLOAD_INVALID = "payload is invalidate";

private MqttMessages() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.mqtt.i18n;

public final class MqttMessages {

// --- LinePayloadFormatter ---
public static final String INVALID_LINE_PROTOCOL = "行协议格式无效,行内容:{}";
public static final String TAGS_ERROR = "标签格式错误,行内容:{}";
public static final String ATTRIBUTES_ERROR = "属性格式错误,行内容:{}";
public static final String FIELDS_ERROR = "字段格式错误,行内容:{}";
public static final String TIMESTAMP_ERROR = "时间戳格式错误,行内容:{}";

// --- MPPPublishHandler ---
public static final String ON_PUBLISH_EXCEPTION =
"onPublish 执行异常,消息为 [{}],错误:";
public static final String PROCESS_RESULT = "处理结果:{}";

// --- MQTTService ---
public static final String SERVER_START_EXCEPTION = "启动服务器时发生异常";
public static final String STOPPING_MQTT_SERVICE = "正在停止 IoTDB MQTT 服务...";
public static final String MQTT_SERVICE_STOPPED = "IoTDB MQTT 服务已停止。";

// --- PayloadFormatManager ---
public static final String MQTT_DIR = "mqttDir:{}";
public static final String PAYLOAD_FORMAT_MANAGER_INIT_ERROR =
"MQTT PayloadFormatManager init() 出错。";
public static final String FORMATTER_IS_NULL = "PayloadFormatManager(),formatter 为 null。";
public static final String FIND_MQTT_PLUGIN =
"PayloadFormatManager(),找到 MQTT Payload 插件 {}。";
public static final String MQTT_PLUGIN_JAR_URLS = "MQTT 插件 jarURLs:{}";

// --- JSONPayloadFormatter ---
public static final String PAYLOAD_INVALID = "payload 无效";

private MqttMessages() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.iotdb.mqtt;

import org.apache.iotdb.mqtt.i18n.MqttMessages;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
Expand Down Expand Up @@ -79,7 +81,7 @@ public List<Message> format(String topic, ByteBuf payload) {
}
return messages;
}
throw new JsonParseException("payload is invalidate");
throw new JsonParseException(MqttMessages.PAYLOAD_INVALID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.iotdb.mqtt;

import org.apache.iotdb.mqtt.i18n.MqttMessages;

import io.netty.buffer.ByteBuf;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.external.commons.lang3.NotImplementedException;
Expand Down Expand Up @@ -83,7 +85,7 @@ public List<Message> format(String topic, ByteBuf payload) {
try {
Matcher matcher = pattern.matcher(line.trim());
if (!matcher.matches()) {
log.warn("Invalid line protocol format ,line is {}", line);
log.warn(MqttMessages.INVALID_LINE_PROTOCOL, line);
continue;
}

Expand All @@ -95,25 +97,25 @@ public List<Message> format(String topic, ByteBuf payload) {

// Parsing Tags
if (!setTags(matcher, message)) {
log.warn("The tags is error , line is {}", line);
log.warn(MqttMessages.TAGS_ERROR, line);
continue;
}

// Parsing Attributes
if (!setAttributes(matcher, message)) {
log.warn("The attributes is error , line is {}", line);
log.warn(MqttMessages.ATTRIBUTES_ERROR, line);
continue;
}

// Parsing Fields
if (!setFields(matcher, message)) {
log.warn("The fields is error , line is {}", line);
log.warn(MqttMessages.FIELDS_ERROR, line);
continue;
}

// Parsing timestamp
if (!setTimestamp(matcher, message)) {
log.warn("The timestamp is error , line is {}", line);
log.warn(MqttMessages.TIMESTAMP_ERROR, line);
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.mqtt.i18n.MqttMessages;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;

Expand Down Expand Up @@ -159,7 +160,7 @@ public void onPublish(InterceptPublishMessage msg) {
}
}
} catch (Throwable t) {
LOG.warn("onPublish execution exception, msg is [{}], error is ", msg, t);
LOG.warn(MqttMessages.ON_PUBLISH_EXCEPTION, msg, t);
} finally {
// release the payload of the message
super.onPublish(msg);
Expand Down Expand Up @@ -191,7 +192,7 @@ private void insertTable(TableMessage message, MqttClientSession session) {

tsStatus = result.status;
if (LOG.isDebugEnabled()) {
LOG.debug("process result: {}", tsStatus);
LOG.debug(MqttMessages.PROCESS_RESULT, tsStatus);
}
if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& tsStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
Expand Down Expand Up @@ -310,7 +311,7 @@ private void insertTree(TreeMessage message, MqttClientSession session) {
false);
tsStatus = result.status;
if (LOG.isDebugEnabled()) {
LOG.debug("process result: {}", tsStatus);
LOG.debug(MqttMessages.PROCESS_RESULT, tsStatus);
}
if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& tsStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.externalservice.api.IExternalService;
import org.apache.iotdb.mqtt.i18n.MqttMessages;

import io.moquette.BrokerConstants;
import io.moquette.broker.Server;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void startup() {
try {
server.startServer(config, handlers, null, authenticator, null);
} catch (IOException e) {
throw new RuntimeException("Exception while starting server", e);
throw new RuntimeException(MqttMessages.SERVER_START_EXCEPTION, e);
}

LOG.info(
Expand All @@ -74,9 +75,9 @@ public void startup() {
.addShutdownHook(
new Thread(
() -> {
LOG.info("Stopping IoTDB MQTT service...");
LOG.info(MqttMessages.STOPPING_MQTT_SERVICE);
shutdown();
LOG.info("IoTDB MQTT service stopped.");
LOG.info(MqttMessages.MQTT_SERVICE_STOPPED);
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.mqtt.i18n.MqttMessages;

import com.google.common.base.Preconditions;
import org.apache.tsfile.external.commons.io.FileUtils;
Expand Down Expand Up @@ -54,13 +55,13 @@ private PayloadFormatManager() {}

private static void init() {
mqttDir = IoTDBDescriptor.getInstance().getConfig().getMqttDir();
logger.info("mqttDir: {}", mqttDir);
logger.info(MqttMessages.MQTT_DIR, mqttDir);

try {
makeMqttPluginDir();
buildMqttPluginMap();
} catch (IOException e) {
logger.error("MQTT PayloadFormatManager init() error.", e);
logger.error(MqttMessages.PAYLOAD_FORMAT_MANAGER_INIT_ERROR, e);
}
}

Expand All @@ -83,17 +84,17 @@ private static void buildMqttPluginMap() throws IOException {
ServiceLoader.load(PayloadFormatter.class, PayloadFormatManager.class.getClassLoader());
for (PayloadFormatter formatter : payloadFormatters) {
if (formatter == null) {
logger.error("PayloadFormatManager(), formatter is null.");
logger.error(MqttMessages.FORMATTER_IS_NULL);
continue;
}

String pluginName = formatter.getName();
mqttPayloadPluginMap.put(pluginName, formatter);
logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", pluginName);
logger.info(MqttMessages.FIND_MQTT_PLUGIN, pluginName);
}

URL[] jarURLs = getPluginJarURLs(mqttDir);
logger.debug("MQTT Plugin jarURLs: {}", Arrays.toString(jarURLs));
logger.debug(MqttMessages.MQTT_PLUGIN_JAR_URLS, Arrays.toString(jarURLs));

for (URL jarUrl : jarURLs) {
ClassLoader classLoader = new URLClassLoader(new URL[] {jarUrl});
Expand All @@ -104,7 +105,7 @@ private static void buildMqttPluginMap() throws IOException {

for (PayloadFormatter formatter : payloadFormatters2) {
if (formatter == null) {
logger.error("PayloadFormatManager(), formatter is null.");
logger.error(MqttMessages.FORMATTER_IS_NULL);
continue;
}

Expand All @@ -113,7 +114,7 @@ private static void buildMqttPluginMap() throws IOException {
continue;
}
mqttPayloadPluginMap.put(pluginName, formatter);
logger.info("PayloadFormatManager(), find MQTT Payload Plugin {}.", pluginName);
logger.info(MqttMessages.FIND_MQTT_PLUGIN, pluginName);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions external-service-impl/rest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@
</usedDependencies>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.rest.i18n;

public final class RestMessages {

// --- RestService ---
public static final String REST_SERVICE_START_FAILED = "RestService failed to start: {}";
public static final String REST_SERVICE_START_SUCCESS = "start RestService successfully";
public static final String REST_SERVICE_STOP_FAILED = "RestService failed to stop: {}";

// --- StatementConstructionHandler (v1 / v2 / table) ---
public static final String INVALID_INPUT = "Invalid input: ";

// --- RequestValidationHandler (v2) ---
public static final String PREFIX_PATHS_EMPTY = "prefix_paths should not be empty";

private RestMessages() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.rest.i18n;

public final class RestMessages {

// --- RestService ---
public static final String REST_SERVICE_START_FAILED = "RestService 启动失败:{}";
public static final String REST_SERVICE_START_SUCCESS = "RestService 启动成功";
public static final String REST_SERVICE_STOP_FAILED = "RestService 停止失败:{}";

// --- StatementConstructionHandler (v1 / v2 / table) ---
public static final String INVALID_INPUT = "无效输入:";

// --- RequestValidationHandler (v2) ---
public static final String PREFIX_PATHS_EMPTY = "prefix_paths 不能为空";

private RestMessages() {}
}
Loading