diff --git a/android/data/repositories/src/main/kotlin/com/gemwallet/android/data/repositories/stream/StreamEventHandler.kt b/android/data/repositories/src/main/kotlin/com/gemwallet/android/data/repositories/stream/StreamEventHandler.kt index 06efb223ef..f3c6f90514 100644 --- a/android/data/repositories/src/main/kotlin/com/gemwallet/android/data/repositories/stream/StreamEventHandler.kt +++ b/android/data/repositories/src/main/kotlin/com/gemwallet/android/data/repositories/stream/StreamEventHandler.kt @@ -26,7 +26,7 @@ import com.wallet.core.primitives.StreamEvent import com.wallet.core.primitives.StreamNotificationUpdate import com.wallet.core.primitives.StreamTransactionsUpdate import com.wallet.core.primitives.StreamWalletUpdate -import com.wallet.core.primitives.SupportMessage +import com.wallet.core.primitives.SupportStreamEvent import com.wallet.core.primitives.WebSocketPricePayload import kotlinx.coroutines.flow.firstOrNull @@ -115,8 +115,11 @@ class StreamEventHandler( inAppNotificationsDao.put(listOf(update.notification.toRecord())) } - private suspend fun handleSupport(message: SupportMessage) { - supportMessagesDao.addMessages(listOf(message.toRecord())) + private suspend fun handleSupport(event: SupportStreamEvent) { + when (event) { + is SupportStreamEvent.Message -> supportMessagesDao.addMessages(listOf(event.data.toRecord())) + is SupportStreamEvent.Typing -> {} + } } companion object { diff --git a/android/gemcore/src/main/kotlin/com/wallet/core/primitives/generated/Stream.kt b/android/gemcore/src/main/kotlin/com/wallet/core/primitives/generated/Stream.kt index fb7c0b3d69..bd0e534126 100644 --- a/android/gemcore/src/main/kotlin/com/wallet/core/primitives/generated/Stream.kt +++ b/android/gemcore/src/main/kotlin/com/wallet/core/primitives/generated/Stream.kt @@ -68,7 +68,7 @@ sealed class StreamEvent { data class FiatTransaction(val data: StreamWalletUpdate): StreamEvent() @Serializable @SerialName("support") - data class Support(val data: SupportMessage): StreamEvent() + data class Support(val data: SupportStreamEvent): StreamEvent() } @Serializable diff --git a/android/gemcore/src/main/kotlin/com/wallet/core/primitives/generated/Support.kt b/android/gemcore/src/main/kotlin/com/wallet/core/primitives/generated/Support.kt index f0bbfdc0b6..3d4311967c 100644 --- a/android/gemcore/src/main/kotlin/com/wallet/core/primitives/generated/Support.kt +++ b/android/gemcore/src/main/kotlin/com/wallet/core/primitives/generated/Support.kt @@ -58,6 +58,20 @@ data class SupportMessageInput ( val content: String ) +@Serializable +enum class SupportTypingStatus(val string: String) { + @SerialName("on") + On("on"), + @SerialName("off") + Off("off"), +} + +@Serializable +data class SupportTyping ( + val status: SupportTypingStatus, + val agent: SupportAgent +) + @Serializable sealed class SupportAction { @Serializable @@ -69,10 +83,12 @@ sealed class SupportAction { } @Serializable -enum class SupportTypingStatus(val string: String) { - @SerialName("on") - On("on"), - @SerialName("off") - Off("off"), +sealed class SupportStreamEvent { + @Serializable + @SerialName("message") + data class Message(val data: SupportMessage): SupportStreamEvent() + @Serializable + @SerialName("typing") + data class Typing(val data: SupportTyping): SupportStreamEvent() } diff --git a/core/apps/daemon/src/consumers/support/support_webhook_consumer.rs b/core/apps/daemon/src/consumers/support/support_webhook_consumer.rs index 0d82909ae4..f5793146d7 100644 --- a/core/apps/daemon/src/consumers/support/support_webhook_consumer.rs +++ b/core/apps/daemon/src/consumers/support/support_webhook_consumer.rs @@ -43,13 +43,13 @@ impl MessageConsumer for SupportWebhookConsumer { }; match self.support_client.process_webhook(&device, &webhook).await { - Ok((notifications, stream_events)) => { + Ok(result) => { info_with_fields!( "support webhook processed", device_id = device_id, event = webhook.event, - notifications = notifications, - stream_events = stream_events + notifications = result.notifications, + stream_events = result.stream_events ); Ok(true) } diff --git a/core/crates/primitives/src/lib.rs b/core/crates/primitives/src/lib.rs index 6e5c63c463..ae92e3785d 100644 --- a/core/crates/primitives/src/lib.rs +++ b/core/crates/primitives/src/lib.rs @@ -227,7 +227,10 @@ pub use self::websocket::{WebSocketPriceAction, WebSocketPriceActionType, WebSoc pub mod stream; pub use self::stream::{StreamBalanceUpdate, StreamEvent, StreamMessage, StreamMessagePrices, StreamTransactionsUpdate, StreamWalletUpdate, device_stream_channel}; pub mod support; -pub use self::support::{SupportAction, SupportAgent, SupportMessage, SupportMessageImage, SupportMessageInput, SupportMessageSender, SupportMessageStatus, SupportTypingStatus}; +pub use self::support::{ + SupportAction, SupportAgent, SupportMessage, SupportMessageImage, SupportMessageInput, SupportMessageSender, SupportMessageStatus, SupportStreamEvent, SupportTyping, + SupportTypingStatus, +}; pub mod asset_balance; pub use self::asset_balance::{AddressBalances, AssetBalance, Balance}; pub mod chain_address; diff --git a/core/crates/primitives/src/stream.rs b/core/crates/primitives/src/stream.rs index 58fbf06961..bc4c6e2a45 100644 --- a/core/crates/primitives/src/stream.rs +++ b/core/crates/primitives/src/stream.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use typeshare::typeshare; -use crate::{AssetId, InAppNotification, SupportMessage, TransactionId, WalletId, WebSocketPricePayload}; +use crate::{AssetId, InAppNotification, SupportStreamEvent, TransactionId, WalletId, WebSocketPricePayload}; pub const DEVICE_STREAM_CHANNEL_PREFIX: &str = "stream:device:"; @@ -22,7 +22,7 @@ pub enum StreamEvent { Perpetual(StreamWalletUpdate), InAppNotification(StreamNotificationUpdate), FiatTransaction(StreamWalletUpdate), - Support(SupportMessage), + Support(SupportStreamEvent), } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/core/crates/primitives/src/support.rs b/core/crates/primitives/src/support.rs index 5b0c9a967d..13fbe2a9ae 100644 --- a/core/crates/primitives/src/support.rs +++ b/core/crates/primitives/src/support.rs @@ -88,6 +88,21 @@ pub enum SupportTypingStatus { Off, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[typeshare(swift = "Sendable, Equatable")] +pub struct SupportTyping { + pub status: SupportTypingStatus, + pub agent: SupportAgent, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[typeshare(swift = "Sendable")] +#[serde(tag = "type", content = "data", rename_all = "camelCase")] +pub enum SupportStreamEvent { + Message(SupportMessage), + Typing(SupportTyping), +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[typeshare(swift = "Sendable, Equatable")] #[serde(tag = "type", content = "data", rename_all = "camelCase")] diff --git a/core/crates/support/src/client.rs b/core/crates/support/src/client.rs index a015d30fb2..65c5912631 100644 --- a/core/crates/support/src/client.rs +++ b/core/crates/support/src/client.rs @@ -1,14 +1,25 @@ -use crate::{ChatwootWebhookPayload, constants::EVENT_MESSAGE_CREATED, markdown_plain_text}; +use crate::{ + ChatwootWebhookPayload, + constants::{EVENT_CONVERSATION_TYPING_OFF, EVENT_CONVERSATION_TYPING_ON, EVENT_MESSAGE_CREATED}, + markdown_plain_text, +}; use cacher::CacherClient; use localizer::LanguageLocalizer; use primitives::{ - Device, GorushNotification, PushNotification, PushNotificationTypes, StreamEvent, SupportMessage, device_stream_channel, push_notification::PushNotificationSupport, + Device, GorushNotification, PushNotification, PushNotificationTypes, StreamEvent, SupportMessage, SupportStreamEvent, SupportTypingStatus, device_stream_channel, + push_notification::PushNotificationSupport, }; use std::error::Error; use storage::database::devices::DevicesStore; use storage::{Database, OptionalExtension}; use streamer::{NotificationsPayload, StreamProducer, StreamProducerQueue}; +#[derive(Debug, Default)] +pub struct SupportWebhookResult { + pub notifications: usize, + pub stream_events: usize, +} + pub struct SupportClient { database: Database, stream_producer: StreamProducer, @@ -28,21 +39,35 @@ impl SupportClient { Ok(DevicesStore::get_device(&mut self.database.client()?, device_id).optional()?.map(|d| d.as_primitive())) } - pub async fn process_webhook(&self, device: &Device, payload: &ChatwootWebhookPayload) -> Result<(usize, usize), Box> { - if payload.event.as_str() != EVENT_MESSAGE_CREATED { - return Ok((0, 0)); + pub async fn process_webhook(&self, device: &Device, payload: &ChatwootWebhookPayload) -> Result> { + match payload.event.as_str() { + EVENT_MESSAGE_CREATED => self.process_message_created(device, payload).await, + EVENT_CONVERSATION_TYPING_ON => self.process_typing(device, payload, SupportTypingStatus::On).await, + EVENT_CONVERSATION_TYPING_OFF => self.process_typing(device, payload, SupportTypingStatus::Off).await, + _ => Ok(SupportWebhookResult::default()), } + } - let notifications_count = if let Some(notification) = Self::build_notification(device, payload) { + async fn process_message_created(&self, device: &Device, payload: &ChatwootWebhookPayload) -> Result> { + let notifications = if let Some(notification) = Self::build_notification(device, payload) { self.stream_producer.publish_notifications_support(NotificationsPayload::new(vec![notification])).await?; 1 } else { 0 }; - let stream_events_count = self.publish_stream_message(device, payload).await?; + let stream_events = self.publish_stream_message(device, payload).await?; + + Ok(SupportWebhookResult { notifications, stream_events }) + } + + async fn process_typing(&self, device: &Device, payload: &ChatwootWebhookPayload, status: SupportTypingStatus) -> Result> { + let Some(typing) = payload.support_typing(status) else { + return Ok(SupportWebhookResult::default()); + }; - Ok((notifications_count, stream_events_count)) + self.publish_event(device, StreamEvent::Support(SupportStreamEvent::Typing(typing))).await?; + Ok(SupportWebhookResult { notifications: 0, stream_events: 1 }) } fn build_notification(device: &Device, payload: &ChatwootWebhookPayload) -> Option { @@ -68,14 +93,17 @@ impl SupportClient { return Ok(0); } - let channel = device_stream_channel(&device.id); - self.cacher.publish(&channel, &StreamEvent::Support(message)).await?; + self.publish_event(device, StreamEvent::Support(SupportStreamEvent::Message(message))).await?; Ok(1) } fn should_publish_stream_message(message: &SupportMessage) -> bool { message.sender.is_agent() } + + async fn publish_event(&self, device: &Device, event: StreamEvent) -> Result<(), Box> { + self.cacher.publish(&device_stream_channel(&device.id), &event).await + } } #[cfg(test)] diff --git a/core/crates/support/src/constants.rs b/core/crates/support/src/constants.rs index 08ad96144f..3fc265a2c1 100644 --- a/core/crates/support/src/constants.rs +++ b/core/crates/support/src/constants.rs @@ -4,6 +4,8 @@ pub(crate) const CHATWOOT_DELIVERY_STATUS_READ: &str = "read"; pub(crate) const CHATWOOT_DELIVERY_STATUS_SENT: &str = "sent"; pub(crate) const CHATWOOT_FILE_TYPE_IMAGE: &str = "image"; +pub(crate) const EVENT_CONVERSATION_TYPING_OFF: &str = "conversation_typing_off"; +pub(crate) const EVENT_CONVERSATION_TYPING_ON: &str = "conversation_typing_on"; pub(crate) const EVENT_MESSAGE_CREATED: &str = "message_created"; pub(crate) const PATH_CONFIG: &str = "config"; diff --git a/core/crates/support/src/lib.rs b/core/crates/support/src/lib.rs index c5fc19b488..ef4f48ced8 100644 --- a/core/crates/support/src/lib.rs +++ b/core/crates/support/src/lib.rs @@ -5,6 +5,6 @@ mod model; mod text; pub use chatwoot::ChatwootClient; -pub use client::SupportClient; +pub use client::{SupportClient, SupportWebhookResult}; pub use model::*; pub use text::markdown_plain_text; diff --git a/core/crates/support/src/model.rs b/core/crates/support/src/model.rs index 1b76eb6249..0f0f502e08 100644 --- a/core/crates/support/src/model.rs +++ b/core/crates/support/src/model.rs @@ -1,5 +1,5 @@ use chrono::{DateTime, Utc}; -use primitives::{Device, SupportAgent, SupportMessage, SupportMessageImage, SupportMessageSender, SupportMessageStatus, SupportTypingStatus}; +use primitives::{Device, SupportAgent, SupportMessage, SupportMessageImage, SupportMessageSender, SupportMessageStatus, SupportTyping, SupportTypingStatus}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -63,6 +63,8 @@ pub struct ChatwootWebhookPayload { #[serde(default)] pub created_at: Option, pub sender: Option, + pub user: Option, + pub is_private: Option, #[serde(default)] pub attachments: Vec, #[serde(default)] @@ -112,6 +114,13 @@ pub struct Sender { pub custom_attributes: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WebhookUser { + pub name: Option, + #[serde(rename = "type")] + pub user_type: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Attachment { pub id: i64, @@ -152,6 +161,18 @@ impl ChatwootWebhookPayload { &self.attachments, ) } + + pub fn support_typing(&self, status: SupportTypingStatus) -> Option { + if self.is_private == Some(true) { + return None; + } + let user = self.user.as_ref()?; + if user.user_type.as_deref() != Some("user") { + return None; + } + let name = user.name.clone()?; + Some(SupportTyping { status, agent: SupportAgent { name } }) + } } impl Message { @@ -373,6 +394,24 @@ mod tests { assert_eq!(messages[0].status, SupportMessageStatus::Sent); } + #[test] + fn test_support_typing() { + let agent_on: ChatwootWebhookPayload = serde_json::from_str(include_str!("../tests/testdata/chatwoot_conversation_typing_on.json")).unwrap(); + + let typing = agent_on.support_typing(SupportTypingStatus::On).unwrap(); + + assert_eq!(typing.status, SupportTypingStatus::On); + assert_eq!(typing.agent, SupportAgent { name: "Test Agent".to_string() }); + assert_eq!(agent_on.get_device_id(), Some("test-device-id".to_string())); + + let private: ChatwootWebhookPayload = serde_json::from_str(r#"{"event":"conversation_typing_on","is_private":true,"user":{"name":"Test Agent","type":"user"}}"#).unwrap(); + assert_eq!(private.support_typing(SupportTypingStatus::On), None); + + let contact: ChatwootWebhookPayload = + serde_json::from_str(r#"{"event":"conversation_typing_on","is_private":false,"user":{"name":"test-user","type":"contact"}}"#).unwrap(); + assert_eq!(contact.support_typing(SupportTypingStatus::On), None); + } + #[test] fn test_support_messages_keep_missing_private_strict() { let message: Message = serde_json::from_str( diff --git a/core/crates/support/tests/testdata/chatwoot_conversation_typing_on.json b/core/crates/support/tests/testdata/chatwoot_conversation_typing_on.json new file mode 100644 index 0000000000..54948398b5 --- /dev/null +++ b/core/crates/support/tests/testdata/chatwoot_conversation_typing_on.json @@ -0,0 +1,18 @@ +{ + "event": "conversation_typing_on", + "is_private": false, + "user": { + "name": "Test Agent", + "type": "user" + }, + "conversation": { + "id": 1, + "meta": { + "sender": { + "custom_attributes": { + "device_id": "test-device-id" + } + } + } + } +} diff --git a/ios/Packages/FeatureServices/StreamService/StreamEventService.swift b/ios/Packages/FeatureServices/StreamService/StreamEventService.swift index 64653fc160..0f500ad284 100644 --- a/ios/Packages/FeatureServices/StreamService/StreamEventService.swift +++ b/ios/Packages/FeatureServices/StreamService/StreamEventService.swift @@ -69,8 +69,13 @@ public struct StreamEventService: Sendable { Task { await perform { try await priceAlertService.update() } } case let .fiatTransaction(update): Task { await perform { try await handleFiatTransactionUpdate(update) } } - case let .support(message): - await perform { try supportChatStore.addMessages([message]) } + case let .support(event): + switch event { + case let .message(message): + await perform { try supportChatStore.addMessages([message]) } + case .typing: + break + } } } } diff --git a/ios/Packages/Primitives/Sources/Generated/Stream.swift b/ios/Packages/Primitives/Sources/Generated/Stream.swift index d436d1e7ce..3b77ed28a1 100644 --- a/ios/Packages/Primitives/Sources/Generated/Stream.swift +++ b/ios/Packages/Primitives/Sources/Generated/Stream.swift @@ -67,7 +67,7 @@ public enum StreamEvent: Codable, Sendable { case perpetual(StreamWalletUpdate) case inAppNotification(StreamNotificationUpdate) case fiatTransaction(StreamWalletUpdate) - case support(SupportMessage) + case support(SupportStreamEvent) enum CodingKeys: String, CodingKey, Codable { case prices, @@ -130,7 +130,7 @@ public enum StreamEvent: Codable, Sendable { return } case .support: - if let content = try? container.decode(SupportMessage.self, forKey: .data) { + if let content = try? container.decode(SupportStreamEvent.self, forKey: .data) { self = .support(content) return } diff --git a/ios/Packages/Primitives/Sources/Generated/Support.swift b/ios/Packages/Primitives/Sources/Generated/Support.swift index 512d9f90e3..aa435eeb70 100644 --- a/ios/Packages/Primitives/Sources/Generated/Support.swift +++ b/ios/Packages/Primitives/Sources/Generated/Support.swift @@ -106,6 +106,21 @@ public struct SupportMessageInput: Codable, Equatable, Sendable { } } +public enum SupportTypingStatus: String, Codable, CaseIterable, Equatable, Sendable { + case on + case off +} + +public struct SupportTyping: Codable, Equatable, Sendable { + public let status: SupportTypingStatus + public let agent: SupportAgent + + public init(status: SupportTypingStatus, agent: SupportAgent) { + self.status = status + self.agent = agent + } +} + public enum SupportAction: Codable, Equatable, Sendable { case typing(SupportTypingStatus) case lastSeen @@ -148,7 +163,47 @@ public enum SupportAction: Codable, Equatable, Sendable { } } -public enum SupportTypingStatus: String, Codable, CaseIterable, Equatable, Sendable { - case on - case off +public enum SupportStreamEvent: Codable, Sendable { + case message(SupportMessage) + case typing(SupportTyping) + + enum CodingKeys: String, CodingKey, Codable { + case message, + typing + } + + private enum ContainerCodingKeys: String, CodingKey { + case type, data + } + + public init(from decoder: Decoder) throws { + let container = try decoder.container(keyedBy: ContainerCodingKeys.self) + if let type = try? container.decode(CodingKeys.self, forKey: .type) { + switch type { + case .message: + if let content = try? container.decode(SupportMessage.self, forKey: .data) { + self = .message(content) + return + } + case .typing: + if let content = try? container.decode(SupportTyping.self, forKey: .data) { + self = .typing(content) + return + } + } + } + throw DecodingError.typeMismatch(SupportStreamEvent.self, DecodingError.Context(codingPath: decoder.codingPath, debugDescription: "Wrong type for SupportStreamEvent")) + } + + public func encode(to encoder: Encoder) throws { + var container = encoder.container(keyedBy: ContainerCodingKeys.self) + switch self { + case .message(let content): + try container.encode(CodingKeys.message, forKey: .type) + try container.encode(content, forKey: .data) + case .typing(let content): + try container.encode(CodingKeys.typing, forKey: .type) + try container.encode(content, forKey: .data) + } + } }