diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java index 5fee1ff7f..456ba4fb8 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageBuilder.java @@ -186,7 +186,13 @@ else if (key.startsWith(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)) { private void swapPrefix(String key, String currentPrefix, String newPrefix) { Object value = headers.remove(key); key = key.substring(currentPrefix.length()); - this.headers.put(newPrefix + key, value); + if (newPrefix.equals(CloudEventMessageUtils.KAFKA_ATTR_PREFIX) + && key.equals(CloudEventMessageUtils._DATACONTENTTYPE)) { + this.headers.put("content-type", value); + } + else { + this.headers.put(newPrefix + key, value); + } } private Message doBuild(String prefix) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java index 14006b4d0..96ee08c2f 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtils.java @@ -205,10 +205,16 @@ public static String getType(Message message) { public static String getDataContentType(Message message) { String prefix = determinePrefixToUse(message.getHeaders()); Object value = message.getHeaders().get(prefix + _DATACONTENTTYPE); + if (value == null && isCloudEvent(message)) { + value = message.getHeaders().get("content-type"); + if (value == null) { + value = message.getHeaders().get(MessageHeaders.CONTENT_TYPE); + } + } if (value instanceof byte[] v) { value = toString(v); } - return (String) value; + return value != null ? value.toString() : null; } public static URI getDataSchema(Message message) { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java index 3be66e4a7..71e9cf8af 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/cloudevent/CloudEventMessageUtilsAndBuilderTests.java @@ -98,6 +98,30 @@ public void testAttributeRecognitionAndCanonicalization() { assertThat(CloudEventMessageUtils.getSpecVersion(httpMessage)).isEqualTo("1.0"); } + @Test + void buildWithKafkaPrefixUsesContentTypeHeaderForDataContentType() { + Message kafkaMessage = CloudEventMessageBuilder.withData("hello") + .setDataContentType("application/json") + .build(CloudEventMessageUtils.KAFKA_ATTR_PREFIX); + + assertThat(kafkaMessage.getHeaders()) + .containsEntry("content-type", "application/json") + .doesNotContainKey("ce_datacontenttype"); + assertThat(CloudEventMessageUtils.getDataContentType(kafkaMessage)).isEqualTo("application/json"); + } + + @Test + void buildWithDefaultPrefixUsesCloudEventDataContentTypeAttribute() { + Message message = CloudEventMessageBuilder.withData("hello") + .setDataContentType("application/json") + .build(CloudEventMessageUtils.DEFAULT_ATTR_PREFIX); + + assertThat(message.getHeaders()) + .containsEntry("ce-datacontenttype", "application/json") + .doesNotContainKey("content-type"); + assertThat(CloudEventMessageUtils.getDataContentType(message)).isEqualTo("application/json"); + } + @Test void canonicalizeHeadersWithPossibleCopyReturnsCopyWithUpdatedHeadersWhenModified() { // TODO add the following test cases