Skip to content

Commit d6918a8

Browse files
50U10FCA7tyranron
andauthored
Fix errors handling in GraphQL subscriptions (#1371)
- fix errors mapping in `#[graphql_subscription]` macro expansion in `juniper_codegen` crate - fix memory leak caused by incorrect errors handling in GraphQL subscriptions machinery - fix `ValuesStream` to return batch of `ExecutionError`s instead of a single one in `juniper_crate` - add `ConnectionConfig::panic_handler` field and `ConnectionConfig::with_panic_handler()` method allowing to specify `PanicHandler` for panics happened during execution of operations in `juniper_graphql_ws` crate Additionally: - merge `graphql_transport_ws::NextPayload` and `graphql_ws::DataPayload` into a single struct in `juniper_graphql_ws` crate - reduce excessive `Box`ing of `Stream`s in `juniper_graphql_ws` crate Co-authored-by: Kai Ren <tyranron@gmail.com>
1 parent 4dc4813 commit d6918a8

14 files changed

Lines changed: 301 additions & 153 deletions

File tree

juniper/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ All user visible changes to `juniper` crate will be documented in this file. Thi
2020
- Renamed `ast::Operation::variable_definitions` field to `variables_definition`.
2121
- Changed `ScalarToken::String` to contain raw quoted and escaped `StringLiteral` (was unquoted but escaped string before). ([#1349])
2222
- Added `LexerError::UnterminatedBlockString` variant. ([#1349])
23+
- Fixed `ValuesStream` to return batch of `ExecutionError`s instead of a single one. ([#1371])
2324

2425
### Added
2526

@@ -62,6 +63,7 @@ All user visible changes to `juniper` crate will be documented in this file. Thi
6263
- Incorrect `__Type.specifiedByUrl` field to `__Type.specifiedByURL`. ([#1348])
6364
- Missing `@specifiedBy(url:)` directive in [SDL] generated by `RootNode::as_sdl()` and `RootNode::as_document()` methods. ([#1348])
6465
- Incorrect double escaping in `ScalarToken::String` `Display`ing. ([#1349])
66+
- Memory leak caused by incorrect error handling in `#[graphql_subscription]` macro expansion. ([#1371])
6567

6668
[#864]: /../../issues/864
6769
[#1055]: /../../issues/1055
@@ -75,6 +77,7 @@ All user visible changes to `juniper` crate will be documented in this file. Thi
7577
[#1355]: /../../pull/1355
7678
[#1358]: /../../pull/1358
7779
[#1361]: /../../pull/1361
80+
[#1371]: /../../pull/1371
7881
[graphql/graphql-spec#525]: https://github.com/graphql/graphql-spec/pull/525
7982
[graphql/graphql-spec#687]: https://github.com/graphql/graphql-spec/issues/687
8083
[graphql/graphql-spec#805]: https://github.com/graphql/graphql-spec/pull/805

juniper/src/executor/mod.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use std::{
55
cmp::Ordering,
66
collections::HashMap,
77
fmt::{Debug, Display},
8+
mem,
9+
pin::Pin,
810
sync::{Arc, RwLock},
911
};
1012

@@ -228,17 +230,17 @@ impl<S> FieldError<S> {
228230
}
229231
}
230232

231-
/// The result of resolving the value of a field of type `T`
233+
/// [`Result`] of resolving the value of a field of type `T`.
232234
pub type FieldResult<T, S = DefaultScalarValue> = Result<T, FieldError<S>>;
233235

234-
/// The result of resolving an unspecified field
236+
/// [`Result`] of resolving an unspecified field.
235237
pub type ExecutionResult<S = DefaultScalarValue> = Result<Value<S>, FieldError<S>>;
236238

237-
/// Boxed `Stream` yielding `Result<Value<S>, ExecutionError<S>>`
239+
/// [`Box`]ed [`Stream`] yielding `Result<Value<S>, Vec<ExecutionError<S>>>`.
238240
pub type ValuesStream<'a, S = DefaultScalarValue> =
239-
std::pin::Pin<Box<dyn Stream<Item = Result<Value<S>, ExecutionError<S>>> + Send + 'a>>;
241+
Pin<Box<dyn Stream<Item = Result<Value<S>, Vec<ExecutionError<S>>>> + Send + 'a>>;
240242

241-
/// The map of variables used for substitution during query execution
243+
/// [`HashMap`] of variables used for substitution during query execution.
242244
pub type Variables<S = DefaultScalarValue> = HashMap<String, InputValue<S>>;
243245

244246
/// Custom error handling trait to enable error types other than [`FieldError`]
@@ -682,6 +684,12 @@ where
682684
}
683685
}
684686

687+
/// Takes errors from this [`Executor`] clearing its internal [`ExecutionError`]s buffer.
688+
#[must_use]
689+
pub fn take_errors(&self) -> Vec<ExecutionError<S>> {
690+
mem::take(&mut self.errors.write().unwrap())
691+
}
692+
685693
/// Construct a lookahead selection for the current selection.
686694
///
687695
/// This allows seeing the whole selection and perform operations

juniper/src/tests/subscriptions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ fn create_and_execute(
9090
) -> Result<
9191
(
9292
Vec<String>,
93-
Vec<Vec<Result<Value<DefaultScalarValue>, ExecutionError<DefaultScalarValue>>>>,
93+
Vec<Vec<Result<Value<DefaultScalarValue>, Vec<ExecutionError<DefaultScalarValue>>>>>,
9494
),
9595
Vec<ExecutionError<DefaultScalarValue>>,
9696
> {

juniper_codegen/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@ All user visible changes to `juniper_codegen` crate will be documented in this f
1818
- Placing `#[graphql(deprecated)]` attribute on method arguments in `#[graphql_object]` and `#[graphql_interface]` macros.
1919
- Support of `#[graphql(rename_all = "snake_case")]` attribute. ([#1354])
2020

21+
### Fixed
22+
23+
- Memory leak caused by incorrect error handling in `#[graphql_subscription]` macro expansion. ([#1371])
24+
2125
[#864]: /../../issues/864
2226
[#1055]: /../../issues/1055
2327
[#1062]: /../../issues/1062
2428
[#1347]: /../../issues/1347
2529
[#1348]: /../../pull/1348
2630
[#1354]: /../../pull/1354
31+
[#1371]: /../../pull/1371
2732
[graphql/graphql-spec#525]: https://github.com/graphql/graphql-spec/pull/525
2833
[graphql/graphql-spec#805]: https://github.com/graphql/graphql-spec/pull/805
2934
[graphql/graphql-spec#825]: https://github.com/graphql/graphql-spec/pull/825

juniper_codegen/src/common/field/mod.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -359,15 +359,20 @@ impl Definition {
359359
::core::option::Option::Some((ctx, r)),
360360
) => {
361361
let sub = ex.replaced_context(ctx);
362-
sub.resolve_with_ctx_async(&(), &r)
362+
let val = sub.resolve_with_ctx_async(&(), &r)
363363
.await
364-
.map_err(|e| ex.new_error(e))
364+
.map_err(|e| ::std::vec![ex.new_error(e)])?;
365+
let errs = sub.take_errors();
366+
if !errs.is_empty() {
367+
return ::core::result::Result::Err(errs)
368+
}
369+
::core::result::Result::Ok(val)
365370
}
366371
::core::result::Result::Ok(::core::option::Option::None) => {
367372
::core::result::Result::Ok(::juniper::Value::null())
368373
}
369374
::core::result::Result::Err(e) => {
370-
::core::result::Result::Err(ex.new_error(e))
375+
::core::result::Result::Err(::std::vec![ex.new_error(e)])
371376
}
372377
}
373378
}

juniper_graphql_ws/CHANGELOG.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,22 @@ All user visible changes to `juniper_graphql_ws` crate will be documented in thi
1616
- Made [WebSocket] connection closed once `ConnectionConfig::keep_alive::timeout` is reached in [`graphql-transport-ws` GraphQL over WebSocket Protocol][proto-6.0.7]. ([#1367])
1717
> **COMPATIBILITY**: Previously, a [WebSocket] connection was kept alive, even when clients do not respond to server's `Pong` messages at all. To preserve the previous behavior, the `ConnectionConfig::keep_alive::timeout` should be set to `Duration:::ZERO`.
1818
19+
### Added
20+
21+
- `ConnectionConfig::panic_handler` field and `ConnectionConfig::with_panic_handler()` method allowing to specify `PanicHandler` for panics happened during execution of [GraphQL] operations. ([#1371])
22+
23+
### Changed
24+
25+
- Merged `graphql_transport_ws::NextPayload` and `graphql_ws::DataPayload` into a single struct. ([#1371])
26+
1927
### Fixed
2028

21-
- Inability to re-subscribe with the same operation `id` after subscription was completed by server. ([#1368])
29+
- Inability to re-subscribe with the same operation `id` after subscription was completed by server. ([#1368])
2230

2331
[#1367]: /../../pull/1367
2432
[#1368]: /../../pull/1368
2533
[#1369]: /../../pull/1369
34+
[#1371]: /../../pull/1371
2635
[proto-6.0.7]: https://github.com/enisdenjo/graphql-ws/blob/v6.0.7/PROTOCOL.md
2736

2837

juniper_graphql_ws/src/graphql_transport_ws/mod.rs

Lines changed: 93 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ mod client_message;
1414
mod server_message;
1515

1616
use std::{
17-
collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned, pin::Pin,
18-
sync::Arc, time::Duration,
17+
collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned,
18+
panic::AssertUnwindSafe, pin::Pin, sync::Arc, time::Duration,
1919
};
2020

2121
use derive_more::with_trait::From;
2222
use juniper::{
23-
GraphQLError, RuleError, ScalarValue,
23+
GraphQLError, RuleError, ScalarValue, Value,
2424
futures::{
2525
Sink, Stream,
2626
channel::oneshot,
@@ -40,7 +40,7 @@ pub use self::{
4040

4141
struct ExecutionParams<S: Schema> {
4242
subscribe_payload: SubscribePayload<S::ScalarValue>,
43-
config: ConnectionConfig<S::Context>,
43+
config: ConnectionConfig<S::Context, S::ScalarValue>,
4444
schema: S,
4545
}
4646

@@ -71,9 +71,9 @@ pub enum Output<S: ScalarValue> {
7171
}
7272

7373
impl<S: ScalarValue + Send> Output<S> {
74-
/// Converts the reaction into a one-item stream.
75-
fn into_stream(self) -> BoxStream<'static, Self> {
76-
stream::once(future::ready(self)).boxed()
74+
/// Wraps this [`Output`] into a singe-item [`Stream`].
75+
fn into_stream(self) -> stream::Once<future::Ready<Self>> {
76+
stream::once(future::ready(self))
7777
}
7878
}
7979

@@ -82,7 +82,7 @@ enum ConnectionState<S: Schema, I: Init<S::ScalarValue, S::Context>> {
8282
PreInit { init: I, schema: S },
8383
/// Active is the state after a ConnectionInit message has been accepted.
8484
Active {
85-
config: ConnectionConfig<S::Context>,
85+
config: ConnectionConfig<S::Context, S::ScalarValue>,
8686
stoppers: HashMap<String, oneshot::Sender<()>>,
8787
ping: Arc<Notify>,
8888
schema: S,
@@ -107,26 +107,23 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
107107

108108
let ping = Arc::new(Notify::new());
109109

110-
let mut s = Output::Message(ServerMessage::ConnectionAck)
111-
.into_stream()
112-
.boxed();
110+
let s = Output::Message(ServerMessage::ConnectionAck).into_stream();
113111

114-
if keep_alive_interval > Duration::from_secs(0) {
115-
s = s
116-
.chain(Output::Message(ServerMessage::Pong).into_stream())
117-
.boxed();
118-
s = s
112+
let s = if keep_alive_interval > Duration::from_secs(0) {
113+
s.chain(Output::Message(ServerMessage::Pong).into_stream())
119114
.chain(stream::repeat(()).then(move |()| {
120115
tokio::time::sleep(keep_alive_interval)
121116
.map(|()| Output::Message(ServerMessage::Pong))
122117
}))
123-
.boxed();
124-
}
118+
.right_stream()
119+
} else {
120+
s.left_stream()
121+
};
125122

126-
if keep_alive_timeout > Duration::from_secs(0) {
123+
let s = if keep_alive_timeout > Duration::from_secs(0) {
127124
let ping_rx = ping.clone();
128-
s = stream::select_all([
129-
s,
125+
stream::select_all([
126+
s.boxed(),
130127
stream::repeat(())
131128
.then(move |()| {
132129
let ping_rx = ping_rx.clone();
@@ -143,8 +140,10 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
143140
.filter_map(future::ready)
144141
.boxed(),
145142
])
146-
.boxed();
147-
}
143+
.boxed()
144+
} else {
145+
s.boxed()
146+
};
148147

149148
(
150149
Self::Active {
@@ -199,6 +198,7 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
199198
message: format!("Subscriber for {id} already exists"),
200199
}
201200
.into_stream()
201+
.boxed()
202202
} else if config.max_in_flight_operations > 0
203203
&& stoppers.len() >= config.max_in_flight_operations
204204
{
@@ -286,35 +286,80 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
286286

287287
let params = Arc::new(params);
288288

289-
// Try to execute this as a query or mutation.
290-
match juniper::execute(
289+
let fut = juniper::execute(
291290
&params.subscribe_payload.query,
292291
params.subscribe_payload.operation_name.as_deref(),
293292
params.schema.root_node(),
294293
&params.subscribe_payload.variables,
295294
&params.config.context,
296295
)
297-
.await
298-
{
299-
Ok((data, errors)) => {
300-
return Output::Message(ServerMessage::Next {
301-
id: id.clone(),
302-
payload: NextPayload { data, errors },
303-
})
304-
.into_stream();
305-
}
306-
Err(GraphQLError::IsSubscription) => {}
307-
Err(e) => {
308-
return Output::Message(ServerMessage::Error {
296+
.map_ok(|(data, errors)| {
297+
Output::Message(ServerMessage::Next {
298+
id: id.clone(),
299+
payload: NextPayload { data, errors },
300+
})
301+
.into_stream()
302+
.left_stream()
303+
})
304+
.unwrap_or_else(|e| {
305+
if matches!(e, GraphQLError::IsSubscription) {
306+
SubscriptionStart::new(id.clone(), params.clone()).right_stream()
307+
} else {
308+
Output::Message(ServerMessage::Error {
309309
id: id.clone(),
310310
payload: ErrorPayload::new(Box::new(params.clone()), e),
311311
})
312-
.into_stream();
312+
.into_stream()
313+
.left_stream()
313314
}
315+
});
316+
if let Some(panic_handler) = params.config.panic_handler.as_ref().map(Arc::clone) {
317+
let stream = AssertUnwindSafe(fut)
318+
.catch_unwind()
319+
.await
320+
.unwrap_or_else(|e| {
321+
if let Some(e) = panic_handler(e, &params.config.context) {
322+
Output::Message(ServerMessage::Next {
323+
id: id.clone(),
324+
payload: NextPayload {
325+
data: Value::null(),
326+
errors: vec![e],
327+
},
328+
})
329+
} else {
330+
Output::Close {
331+
code: 1000,
332+
message: "Operation execution panicked".into(),
333+
}
334+
}
335+
.into_stream()
336+
.left_stream()
337+
});
338+
AssertUnwindSafe(stream)
339+
.catch_unwind()
340+
.map(move |res| match res {
341+
Ok(item) => item,
342+
Err(e) => {
343+
if let Some(e) = panic_handler(e, &params.config.context) {
344+
Output::Message(ServerMessage::Next {
345+
id: id.clone(),
346+
payload: NextPayload {
347+
data: Value::null(),
348+
errors: vec![e],
349+
},
350+
})
351+
} else {
352+
Output::Close {
353+
code: 1000,
354+
message: "Subscription execution panicked".into(),
355+
}
356+
}
357+
}
358+
})
359+
.boxed()
360+
} else {
361+
fut.await.boxed()
314362
}
315-
316-
// Try to execute as a subscription.
317-
SubscriptionStart::new(id, params.clone()).boxed()
318363
}
319364
}
320365

@@ -349,12 +394,12 @@ struct SubscriptionStart<S: Schema> {
349394
}
350395

351396
impl<S: Schema> SubscriptionStart<S> {
352-
fn new(id: String, params: Arc<ExecutionParams<S>>) -> Pin<Box<Self>> {
353-
Box::pin(Self {
397+
fn new(id: String, params: Arc<ExecutionParams<S>>) -> Self {
398+
Self {
354399
params,
355400
state: SubscriptionStartState::Init { id },
356401
_marker: PhantomPinned,
357-
})
402+
}
358403
}
359404
}
360405

@@ -543,7 +588,8 @@ where
543588
code: 1000,
544589
message: "Normal Closure".into(),
545590
}
546-
.into_stream(),
591+
.into_stream()
592+
.boxed(),
547593
);
548594
ConnectionSinkState::Closed
549595
}
@@ -554,7 +600,8 @@ where
554600
code: 4400,
555601
message: e.to_string(),
556602
}
557-
.into_stream(),
603+
.into_stream()
604+
.boxed(),
558605
);
559606
ConnectionSinkState::Closed
560607
}

0 commit comments

Comments
 (0)