Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.wallet.core.primitives.StreamEvent
import com.wallet.core.primitives.StreamNotificationUpdate
import com.wallet.core.primitives.StreamTransactionsUpdate
import com.wallet.core.primitives.StreamWalletUpdate
import com.wallet.core.primitives.SupportMessage
import com.wallet.core.primitives.SupportStreamEvent
import com.wallet.core.primitives.WebSocketPricePayload
import kotlinx.coroutines.flow.firstOrNull

Expand Down Expand Up @@ -115,8 +115,11 @@ class StreamEventHandler(
inAppNotificationsDao.put(listOf(update.notification.toRecord()))
}

private suspend fun handleSupport(message: SupportMessage) {
supportMessagesDao.addMessages(listOf(message.toRecord()))
private suspend fun handleSupport(event: SupportStreamEvent) {
when (event) {
is SupportStreamEvent.Message -> supportMessagesDao.addMessages(listOf(event.data.toRecord()))
is SupportStreamEvent.Typing -> {}
}
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ sealed class StreamEvent {
data class FiatTransaction(val data: StreamWalletUpdate): StreamEvent()
@Serializable
@SerialName("support")
data class Support(val data: SupportMessage): StreamEvent()
data class Support(val data: SupportStreamEvent): StreamEvent()
}

@Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ data class SupportMessageInput (
val content: String
)

@Serializable
enum class SupportTypingStatus(val string: String) {
@SerialName("on")
On("on"),
@SerialName("off")
Off("off"),
}

@Serializable
data class SupportTyping (
val status: SupportTypingStatus,
val agent: SupportAgent
)

@Serializable
sealed class SupportAction {
@Serializable
Expand All @@ -69,10 +83,12 @@ sealed class SupportAction {
}

@Serializable
enum class SupportTypingStatus(val string: String) {
@SerialName("on")
On("on"),
@SerialName("off")
Off("off"),
sealed class SupportStreamEvent {
@Serializable
@SerialName("message")
data class Message(val data: SupportMessage): SupportStreamEvent()
@Serializable
@SerialName("typing")
data class Typing(val data: SupportTyping): SupportStreamEvent()
}

Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ impl MessageConsumer<SupportWebhookPayload, bool> for SupportWebhookConsumer {
};

match self.support_client.process_webhook(&device, &webhook).await {
Ok((notifications, stream_events)) => {
Ok(result) => {
info_with_fields!(
"support webhook processed",
device_id = device_id,
event = webhook.event,
notifications = notifications,
stream_events = stream_events
notifications = result.notifications,
stream_events = result.stream_events
);
Ok(true)
}
Expand Down
5 changes: 4 additions & 1 deletion core/crates/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ pub use self::websocket::{WebSocketPriceAction, WebSocketPriceActionType, WebSoc
pub mod stream;
pub use self::stream::{StreamBalanceUpdate, StreamEvent, StreamMessage, StreamMessagePrices, StreamTransactionsUpdate, StreamWalletUpdate, device_stream_channel};
pub mod support;
pub use self::support::{SupportAction, SupportAgent, SupportMessage, SupportMessageImage, SupportMessageInput, SupportMessageSender, SupportMessageStatus, SupportTypingStatus};
pub use self::support::{
SupportAction, SupportAgent, SupportMessage, SupportMessageImage, SupportMessageInput, SupportMessageSender, SupportMessageStatus, SupportStreamEvent, SupportTyping,
SupportTypingStatus,
};
pub mod asset_balance;
pub use self::asset_balance::{AddressBalances, AssetBalance, Balance};
pub mod chain_address;
Expand Down
4 changes: 2 additions & 2 deletions core/crates/primitives/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use typeshare::typeshare;

use crate::{AssetId, InAppNotification, SupportMessage, TransactionId, WalletId, WebSocketPricePayload};
use crate::{AssetId, InAppNotification, SupportStreamEvent, TransactionId, WalletId, WebSocketPricePayload};

pub const DEVICE_STREAM_CHANNEL_PREFIX: &str = "stream:device:";

Expand All @@ -22,7 +22,7 @@ pub enum StreamEvent {
Perpetual(StreamWalletUpdate),
InAppNotification(StreamNotificationUpdate),
FiatTransaction(StreamWalletUpdate),
Support(SupportMessage),
Support(SupportStreamEvent),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
15 changes: 15 additions & 0 deletions core/crates/primitives/src/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ pub enum SupportTypingStatus {
Off,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[typeshare(swift = "Sendable, Equatable")]
pub struct SupportTyping {
pub status: SupportTypingStatus,
pub agent: SupportAgent,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[typeshare(swift = "Sendable")]
#[serde(tag = "type", content = "data", rename_all = "camelCase")]
Comment thread
DRadmir marked this conversation as resolved.
pub enum SupportStreamEvent {
Message(SupportMessage),
Typing(SupportTyping),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[typeshare(swift = "Sendable, Equatable")]
#[serde(tag = "type", content = "data", rename_all = "camelCase")]
Expand Down
48 changes: 38 additions & 10 deletions core/crates/support/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
use crate::{ChatwootWebhookPayload, constants::EVENT_MESSAGE_CREATED, markdown_plain_text};
use crate::{
ChatwootWebhookPayload,
constants::{EVENT_CONVERSATION_TYPING_OFF, EVENT_CONVERSATION_TYPING_ON, EVENT_MESSAGE_CREATED},
markdown_plain_text,
};
use cacher::CacherClient;
use localizer::LanguageLocalizer;
use primitives::{
Device, GorushNotification, PushNotification, PushNotificationTypes, StreamEvent, SupportMessage, device_stream_channel, push_notification::PushNotificationSupport,
Device, GorushNotification, PushNotification, PushNotificationTypes, StreamEvent, SupportMessage, SupportStreamEvent, SupportTypingStatus, device_stream_channel,
push_notification::PushNotificationSupport,
};
use std::error::Error;
use storage::database::devices::DevicesStore;
use storage::{Database, OptionalExtension};
use streamer::{NotificationsPayload, StreamProducer, StreamProducerQueue};

#[derive(Debug, Default)]
pub struct SupportWebhookResult {
pub notifications: usize,
pub stream_events: usize,
}

pub struct SupportClient {
database: Database,
stream_producer: StreamProducer,
Expand All @@ -28,21 +39,35 @@ impl SupportClient {
Ok(DevicesStore::get_device(&mut self.database.client()?, device_id).optional()?.map(|d| d.as_primitive()))
}

pub async fn process_webhook(&self, device: &Device, payload: &ChatwootWebhookPayload) -> Result<(usize, usize), Box<dyn Error + Send + Sync>> {
if payload.event.as_str() != EVENT_MESSAGE_CREATED {
return Ok((0, 0));
pub async fn process_webhook(&self, device: &Device, payload: &ChatwootWebhookPayload) -> Result<SupportWebhookResult, Box<dyn Error + Send + Sync>> {
match payload.event.as_str() {
EVENT_MESSAGE_CREATED => self.process_message_created(device, payload).await,
EVENT_CONVERSATION_TYPING_ON => self.process_typing(device, payload, SupportTypingStatus::On).await,
EVENT_CONVERSATION_TYPING_OFF => self.process_typing(device, payload, SupportTypingStatus::Off).await,
_ => Ok(SupportWebhookResult::default()),
}
}

let notifications_count = if let Some(notification) = Self::build_notification(device, payload) {
async fn process_message_created(&self, device: &Device, payload: &ChatwootWebhookPayload) -> Result<SupportWebhookResult, Box<dyn Error + Send + Sync>> {
let notifications = if let Some(notification) = Self::build_notification(device, payload) {
self.stream_producer.publish_notifications_support(NotificationsPayload::new(vec![notification])).await?;
1
} else {
0
};

let stream_events_count = self.publish_stream_message(device, payload).await?;
let stream_events = self.publish_stream_message(device, payload).await?;

Ok(SupportWebhookResult { notifications, stream_events })
}

async fn process_typing(&self, device: &Device, payload: &ChatwootWebhookPayload, status: SupportTypingStatus) -> Result<SupportWebhookResult, Box<dyn Error + Send + Sync>> {
let Some(typing) = payload.support_typing(status) else {
return Ok(SupportWebhookResult::default());
};

Ok((notifications_count, stream_events_count))
self.publish_event(device, StreamEvent::Support(SupportStreamEvent::Typing(typing))).await?;
Ok(SupportWebhookResult { notifications: 0, stream_events: 1 })
}

fn build_notification(device: &Device, payload: &ChatwootWebhookPayload) -> Option<GorushNotification> {
Expand All @@ -68,14 +93,17 @@ impl SupportClient {
return Ok(0);
}

let channel = device_stream_channel(&device.id);
self.cacher.publish(&channel, &StreamEvent::Support(message)).await?;
self.publish_event(device, StreamEvent::Support(SupportStreamEvent::Message(message))).await?;
Ok(1)
}

fn should_publish_stream_message(message: &SupportMessage) -> bool {
message.sender.is_agent()
}

async fn publish_event(&self, device: &Device, event: StreamEvent) -> Result<(), Box<dyn Error + Send + Sync>> {
self.cacher.publish(&device_stream_channel(&device.id), &event).await
}
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions core/crates/support/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ pub(crate) const CHATWOOT_DELIVERY_STATUS_READ: &str = "read";
pub(crate) const CHATWOOT_DELIVERY_STATUS_SENT: &str = "sent";
pub(crate) const CHATWOOT_FILE_TYPE_IMAGE: &str = "image";

pub(crate) const EVENT_CONVERSATION_TYPING_OFF: &str = "conversation_typing_off";
pub(crate) const EVENT_CONVERSATION_TYPING_ON: &str = "conversation_typing_on";
pub(crate) const EVENT_MESSAGE_CREATED: &str = "message_created";

pub(crate) const PATH_CONFIG: &str = "config";
Expand Down
2 changes: 1 addition & 1 deletion core/crates/support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ mod model;
mod text;

pub use chatwoot::ChatwootClient;
pub use client::SupportClient;
pub use client::{SupportClient, SupportWebhookResult};
pub use model::*;
pub use text::markdown_plain_text;
41 changes: 40 additions & 1 deletion core/crates/support/src/model.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{DateTime, Utc};
use primitives::{Device, SupportAgent, SupportMessage, SupportMessageImage, SupportMessageSender, SupportMessageStatus, SupportTypingStatus};
use primitives::{Device, SupportAgent, SupportMessage, SupportMessageImage, SupportMessageSender, SupportMessageStatus, SupportTyping, SupportTypingStatus};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

Expand Down Expand Up @@ -63,6 +63,8 @@ pub struct ChatwootWebhookPayload {
#[serde(default)]
pub created_at: Option<ChatwootDateTime>,
pub sender: Option<Sender>,
pub user: Option<WebhookUser>,
pub is_private: Option<bool>,
#[serde(default)]
pub attachments: Vec<Attachment>,
#[serde(default)]
Expand Down Expand Up @@ -112,6 +114,13 @@ pub struct Sender {
pub custom_attributes: Option<CustomAttributes>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookUser {
pub name: Option<String>,
#[serde(rename = "type")]
pub user_type: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Attachment {
pub id: i64,
Expand Down Expand Up @@ -152,6 +161,18 @@ impl ChatwootWebhookPayload {
&self.attachments,
)
}

pub fn support_typing(&self, status: SupportTypingStatus) -> Option<SupportTyping> {
if self.is_private == Some(true) {
return None;
}
let user = self.user.as_ref()?;
if user.user_type.as_deref() != Some("user") {
return None;
}
let name = user.name.clone()?;
Some(SupportTyping { status, agent: SupportAgent { name } })
}
}

impl Message {
Expand Down Expand Up @@ -373,6 +394,24 @@ mod tests {
assert_eq!(messages[0].status, SupportMessageStatus::Sent);
}

#[test]
fn test_support_typing() {
let agent_on: ChatwootWebhookPayload = serde_json::from_str(include_str!("../tests/testdata/chatwoot_conversation_typing_on.json")).unwrap();

let typing = agent_on.support_typing(SupportTypingStatus::On).unwrap();

assert_eq!(typing.status, SupportTypingStatus::On);
assert_eq!(typing.agent, SupportAgent { name: "Test Agent".to_string() });
assert_eq!(agent_on.get_device_id(), Some("test-device-id".to_string()));

let private: ChatwootWebhookPayload = serde_json::from_str(r#"{"event":"conversation_typing_on","is_private":true,"user":{"name":"Test Agent","type":"user"}}"#).unwrap();
assert_eq!(private.support_typing(SupportTypingStatus::On), None);

let contact: ChatwootWebhookPayload =
serde_json::from_str(r#"{"event":"conversation_typing_on","is_private":false,"user":{"name":"test-user","type":"contact"}}"#).unwrap();
assert_eq!(contact.support_typing(SupportTypingStatus::On), None);
}

#[test]
fn test_support_messages_keep_missing_private_strict() {
let message: Message = serde_json::from_str(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"event": "conversation_typing_on",
"is_private": false,
"user": {
"name": "Test Agent",
"type": "user"
},
"conversation": {
"id": 1,
"meta": {
"sender": {
"custom_attributes": {
"device_id": "test-device-id"
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ public struct StreamEventService: Sendable {
Task { await perform { try await priceAlertService.update() } }
case let .fiatTransaction(update):
Task { await perform { try await handleFiatTransactionUpdate(update) } }
case let .support(message):
await perform { try supportChatStore.addMessages([message]) }
case let .support(event):
switch event {
case let .message(message):
await perform { try supportChatStore.addMessages([message]) }
case .typing:
break
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions ios/Packages/Primitives/Sources/Generated/Stream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public enum StreamEvent: Codable, Sendable {
case perpetual(StreamWalletUpdate)
case inAppNotification(StreamNotificationUpdate)
case fiatTransaction(StreamWalletUpdate)
case support(SupportMessage)
case support(SupportStreamEvent)

enum CodingKeys: String, CodingKey, Codable {
case prices,
Expand Down Expand Up @@ -130,7 +130,7 @@ public enum StreamEvent: Codable, Sendable {
return
}
case .support:
if let content = try? container.decode(SupportMessage.self, forKey: .data) {
if let content = try? container.decode(SupportStreamEvent.self, forKey: .data) {
self = .support(content)
return
}
Expand Down
Loading
Loading