11/*
22 * lws System Message Distribution
33 *
4- * Copyright (C) 2019 - 2021 Andy Green <andy@warmcat.com>
4+ * Copyright (C) 2019 - 2025 Andy Green <andy@warmcat.com>
55 *
66 * Permission is hereby granted, free of charge, to any person obtaining a copy
77 * of this software and associated documentation files (the "Software"), to
@@ -80,8 +80,13 @@ lws_smd_msg_free(void **ppay)
8080}
8181
8282#if defined(LWS_SMD_DEBUG )
83+
84+ /*
85+ * Caller must have peers and messages locks
86+ */
87+
8388static void
84- lws_smd_dump (lws_smd_t * smd )
89+ _lws_smd_dump (lws_smd_t * smd )
8590{
8691 int n = 1 ;
8792
@@ -115,6 +120,8 @@ _lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg)
115120
116121/*
117122 * Figure out what to set the initial refcount for the message to
123+ *
124+ * Caller must have peers and messages locks
118125 */
119126
120127static int
@@ -200,13 +207,20 @@ _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
200207 LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof (* msg ));
201208
202209 if (ctx -> smd .owner_messages .count >= ctx -> smd_queue_depth ) {
203- lwsl_cx_warn (ctx , "rejecting message on queue depth %d" ,
204- (int )ctx -> smd .owner_messages .count );
210+ // lwsl_cx_debug (ctx, "rejecting message on queue depth %d",
211+ // (int)ctx->smd.owner_messages.count);
205212 /* reject the message due to max queue depth reached */
206213 return 1 ;
207214 }
208215
209- if (!ctx -> smd .delivering &&
216+ /*
217+ * In the case we received a message and in the callback for that, send
218+ * one, we end up here already holding lock_peers and will deadlock if
219+ * we try to take it again. Throughout the callback, ctx->smd.delivering
220+ * is set in that case so we can avoid it.
221+ */
222+
223+ if ((!ctx -> smd .delivering || !lws_thread_is (ctx -> smd .tid_holding )) &&
210224 lws_mutex_lock (ctx -> smd .lock_peers )) /* +++++++++++++++ peers */
211225 return 1 ; /* For Coverity */
212226
@@ -220,7 +234,7 @@ _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
220234 lws_mutex_unlock (ctx -> smd .lock_messages ); /* --------------- messages */
221235
222236 lws_free (msg );
223- if (!ctx -> smd .delivering )
237+ if (!ctx -> smd .delivering || ! lws_thread_is ( ctx -> smd . tid_holding ) )
224238 lws_mutex_unlock (ctx -> smd .lock_peers ); /* ------------- peers */
225239
226240 return 0 ;
@@ -252,13 +266,13 @@ _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
252266#if defined(LWS_SMD_DEBUG )
253267 lwsl_smd ("%s: added %p (refc %u) depth now %d\n" , __func__ ,
254268 msg , msg -> refcount , ctx -> smd .owner_messages .count );
255- lws_smd_dump (& ctx -> smd );
269+ _lws_smd_dump (& ctx -> smd );
256270#endif
257271
258272 lws_mutex_unlock (ctx -> smd .lock_messages ); /* --------------- messages */
259273
260274bail :
261- if (!ctx -> smd .delivering )
275+ if (!ctx -> smd .delivering || ! lws_thread_is ( ctx -> smd . tid_holding ) )
262276 lws_mutex_unlock (ctx -> smd .lock_peers ); /* ------------- peers */
263277
264278 /* we may be happening from another thread context */
@@ -537,9 +551,23 @@ _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr)
537551 (unsigned int )msg -> _class , (int )msg -> length ,
538552 pr );
539553
554+ /*
555+ * We call the peer's callback to deliver the message.
556+ * We hold the peer lock for the duration.
557+ * That's tricky because if, in the callback, he uses smd
558+ * apis to send, we will deadlock if we try to grab the
559+ * peer lock as usual in there.
560+ *
561+ * Another way to express this is that for this thread
562+ * (only) we know we already hold the peer lock.
563+ */
564+
565+ ctx -> smd .tid_holding = lws_thread_id ();
566+ ctx -> smd .delivering = 1 ;
540567 pr -> cb (pr -> opaque , msg -> _class , msg -> timestamp ,
541568 ((uint8_t * )& msg [1 ]) + LWS_SMD_SS_RX_HEADER_LEN_EFF ,
542569 (size_t )msg -> length );
570+ ctx -> smd .delivering = 0 ;
543571#if !defined(__COVERITY__ )
544572 assert (msg -> refcount );
545573#endif
@@ -577,7 +605,6 @@ lws_smd_msg_distribute(struct lws_context *ctx)
577605 if (!ctx -> smd .owner_messages .count )
578606 return 0 ;
579607
580- ctx -> smd .delivering = 1 ;
581608
582609 do {
583610 more = 0 ;
@@ -595,7 +622,6 @@ lws_smd_msg_distribute(struct lws_context *ctx)
595622 lws_mutex_unlock (ctx -> smd .lock_peers ); /* ------------- peers */
596623 } while (more );
597624
598- ctx -> smd .delivering = 0 ;
599625
600626 return 0 ;
601627}
@@ -614,11 +640,11 @@ lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
614640 pr -> _class_filter = _class_filter ;
615641 pr -> ctx = ctx ;
616642
617- if (!ctx -> smd .delivering &&
643+ if (( !ctx -> smd .delivering || ! lws_thread_is ( ctx -> smd . tid_holding )) &&
618644 lws_mutex_lock (ctx -> smd .lock_peers )) { /* +++++++++++++++ peers */
619- lws_free (pr );
620- return NULL ; /* For Coverity */
621- }
645+ lws_free (pr );
646+ return NULL ; /* For Coverity */
647+ }
622648
623649 /*
624650 * Let's lock the message list before adding this peer... because...
@@ -659,7 +685,7 @@ lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
659685 (unsigned int )ctx -> smd .owner_peers .count );
660686
661687bail1 :
662- if (!ctx -> smd .delivering )
688+ if (!ctx -> smd .delivering || ! lws_thread_is ( ctx -> smd . tid_holding ) )
663689 lws_mutex_unlock (ctx -> smd .lock_peers ); /* ------------- peers */
664690
665691 return pr ;
@@ -670,12 +696,13 @@ lws_smd_unregister(struct lws_smd_peer *pr)
670696{
671697 lws_smd_t * smd = lws_container_of (pr -> list .owner , lws_smd_t , owner_peers );
672698
673- if (!smd -> delivering &&
699+ if (( !smd -> delivering || ! lws_thread_is ( smd -> tid_holding )) &&
674700 lws_mutex_lock (smd -> lock_peers )) /* +++++++++++++++++++ peers */
675701 return ; /* For Coverity */
676702 lwsl_cx_notice (pr -> ctx , "destroying peer %p" , pr );
677703 _lws_smd_peer_destroy (pr );
678- if (!smd -> delivering )
704+
705+ if (!smd -> delivering || !lws_thread_is (smd -> tid_holding ))
679706 lws_mutex_unlock (smd -> lock_peers ); /* ----------------- peers */
680707}
681708
@@ -697,7 +724,8 @@ lws_smd_message_pending(struct lws_context *ctx)
697724 * have been hanging around too long
698725 */
699726
700- if (lws_mutex_lock (ctx -> smd .lock_peers )) /* +++++++++++++++++++++++ peers */
727+ if ((!ctx -> smd .delivering || !lws_thread_is (ctx -> smd .tid_holding )) &&
728+ lws_mutex_lock (ctx -> smd .lock_peers )) /* +++++++++++++++++++++++ peers */
701729 return 1 ; /* For Coverity */
702730 if (lws_mutex_lock (ctx -> smd .lock_messages )) /* +++++++++++++++++ messages */
703731 goto bail ; /* For Coverity */
@@ -758,7 +786,8 @@ lws_smd_message_pending(struct lws_context *ctx)
758786 ret = 0 ;
759787
760788bail :
761- lws_mutex_unlock (ctx -> smd .lock_peers ); /* --------------------- peers */
789+ if (!ctx -> smd .delivering || !lws_thread_is (ctx -> smd .tid_holding ))
790+ lws_mutex_unlock (ctx -> smd .lock_peers ); /* --------------------- peers */
762791
763792 return ret ;
764793}
0 commit comments