Skip to content

Commit cef5b9b

Browse files
authored
Fix re-subscribing after GraphQL subscription is finished by server (#1368)
1 parent a25c0e8 commit cef5b9b

3 files changed

Lines changed: 116 additions & 109 deletions

File tree

juniper_graphql_ws/CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,17 @@ All user visible changes to `juniper_graphql_ws` crate will be documented in thi
66

77

88

9+
## master
10+
11+
### Fixed
12+
13+
- Inability to re-subscribe with the same operation `id` after subscription was completed by server. ([#1368])
14+
15+
[#1368]: /../../pull/1368
16+
17+
18+
19+
920
## [0.5.0] · 2025-09-08
1021
[0.5.0]: /../../tree/juniper_graphql_ws-v0.5.0/juniper_graphql_ws
1122

juniper_graphql_ws/src/graphql_transport_ws/mod.rs

Lines changed: 52 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -158,68 +158,66 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
158158
} => {
159159
let reactions = match msg {
160160
ClientMessage::Subscribe { id, payload } => {
161+
// Prune stoppers which streams are already completed or canceled.
162+
stoppers.retain(|_, tx| !tx.is_canceled());
163+
161164
if stoppers.contains_key(&id) {
162165
// We already have an operation with this id. We must close the connection.
163166
Output::Close {
164167
code: 4409,
165168
message: format!("Subscriber for {id} already exists"),
166169
}
167170
.into_stream()
171+
} else if config.max_in_flight_operations > 0
172+
&& stoppers.len() >= config.max_in_flight_operations
173+
{
174+
// Too many in-flight operations. Just send back a validation error.
175+
stream::iter(vec![
176+
Output::Message(ServerMessage::Error {
177+
id: id.clone(),
178+
payload: GraphQLError::from(RuleError::new(
179+
"Too many in-flight operations.",
180+
&[],
181+
))
182+
.into(),
183+
}),
184+
Output::Message(ServerMessage::Complete { id }),
185+
])
186+
.boxed()
168187
} else {
169-
// Go ahead and prune canceled stoppers before adding a new one.
170-
stoppers.retain(|_, tx| !tx.is_canceled());
171-
172-
if config.max_in_flight_operations > 0
173-
&& stoppers.len() >= config.max_in_flight_operations
174-
{
175-
// Too many in-flight operations. Just send back a validation error.
176-
stream::iter(vec![
177-
Output::Message(ServerMessage::Error {
178-
id: id.clone(),
179-
payload: GraphQLError::from(RuleError::new(
180-
"Too many in-flight operations.",
181-
&[],
182-
))
183-
.into(),
184-
}),
185-
Output::Message(ServerMessage::Complete { id }),
186-
])
187-
.boxed()
188-
} else {
189-
// Create a channel that we can use to cancel the operation.
190-
let (tx, rx) = oneshot::channel::<()>();
191-
stoppers.insert(id.clone(), tx);
192-
193-
// Create the operation stream. This stream will emit Next and Error
194-
// messages, but will not emit Complete – that part is up to us.
195-
let s = Self::start(
196-
id.clone(),
197-
ExecutionParams {
198-
subscribe_payload: payload,
199-
config: config.clone(),
200-
schema: schema.clone(),
201-
},
202-
)
203-
.into_stream()
204-
.flatten();
205-
206-
// Combine this with our oneshot channel so that the stream ends if the
207-
// oneshot is ever fired.
208-
let s = stream::unfold((rx, s.boxed()), async |(rx, mut s)| {
209-
let next = match future::select(rx, s.next()).await {
210-
Either::Left(_) => None,
211-
Either::Right((r, rx)) => r.map(|r| (r, rx)),
212-
};
213-
next.map(|(r, rx)| (r, (rx, s)))
214-
});
215-
216-
// Once the stream ends, send the Complete message.
217-
let s = s.chain(
218-
Output::Message(ServerMessage::Complete { id }).into_stream(),
219-
);
220-
221-
s.boxed()
222-
}
188+
// Create a channel that we can use to cancel the operation.
189+
let (tx, rx) = oneshot::channel::<()>();
190+
stoppers.insert(id.clone(), tx);
191+
192+
// Create the operation stream. This stream will emit Next and Error
193+
// messages, but will not emit Complete – that part is up to us.
194+
let s = Self::start(
195+
id.clone(),
196+
ExecutionParams {
197+
subscribe_payload: payload,
198+
config: config.clone(),
199+
schema: schema.clone(),
200+
},
201+
)
202+
.into_stream()
203+
.flatten();
204+
205+
// Combine this with our oneshot channel so that the stream ends if the
206+
// oneshot is ever fired.
207+
let s = stream::unfold((rx, s.boxed()), async |(rx, mut s)| {
208+
let next = match future::select(rx, s.next()).await {
209+
Either::Left(_) => None,
210+
Either::Right((r, rx)) => r.map(|r| (r, rx)),
211+
};
212+
next.map(|(r, rx)| (r, (rx, s)))
213+
});
214+
215+
// Once the stream ends, send the Complete message.
216+
let s = s.chain(
217+
Output::Message(ServerMessage::Complete { id }).into_stream(),
218+
);
219+
220+
s.boxed()
223221
}
224222
}
225223
ClientMessage::Complete { id } => {

juniper_graphql_ws/src/graphql_ws/mod.rs

Lines changed: 53 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -141,66 +141,64 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
141141
} => {
142142
let reactions = match msg {
143143
ClientMessage::Start { id, payload } => {
144+
// Prune stoppers which streams are already completed or canceled.
145+
stoppers.retain(|_, tx| !tx.is_canceled());
146+
144147
if stoppers.contains_key(&id) {
145148
// We already have an operation with this id, so we can't start a new
146149
// one.
147150
stream::empty().boxed()
151+
} else if config.max_in_flight_operations > 0
152+
&& stoppers.len() >= config.max_in_flight_operations
153+
{
154+
// Too many in-flight operations. Just send back a validation error.
155+
stream::iter(vec![
156+
Reaction::ServerMessage(ServerMessage::Error {
157+
id: id.clone(),
158+
payload: GraphQLError::from(RuleError::new(
159+
"Too many in-flight operations.",
160+
&[],
161+
))
162+
.into(),
163+
}),
164+
Reaction::ServerMessage(ServerMessage::Complete { id }),
165+
])
166+
.boxed()
148167
} else {
149-
// Go ahead and prune canceled stoppers before adding a new one.
150-
stoppers.retain(|_, tx| !tx.is_canceled());
151-
152-
if config.max_in_flight_operations > 0
153-
&& stoppers.len() >= config.max_in_flight_operations
154-
{
155-
// Too many in-flight operations. Just send back a validation error.
156-
stream::iter(vec![
157-
Reaction::ServerMessage(ServerMessage::Error {
158-
id: id.clone(),
159-
payload: GraphQLError::from(RuleError::new(
160-
"Too many in-flight operations.",
161-
&[],
162-
))
163-
.into(),
164-
}),
165-
Reaction::ServerMessage(ServerMessage::Complete { id }),
166-
])
167-
.boxed()
168-
} else {
169-
// Create a channel that we can use to cancel the operation.
170-
let (tx, rx) = oneshot::channel::<()>();
171-
stoppers.insert(id.clone(), tx);
172-
173-
// Create the operation stream. This stream will emit Data and Error
174-
// messages, but will not emit Complete – that part is up to us.
175-
let s = Self::start(
176-
id.clone(),
177-
ExecutionParams {
178-
start_payload: payload,
179-
config: config.clone(),
180-
schema: schema.clone(),
181-
},
182-
)
183-
.into_stream()
184-
.flatten();
185-
186-
// Combine this with our oneshot channel so that the stream ends if the
187-
// oneshot is ever fired.
188-
let s = stream::unfold((rx, s.boxed()), async |(rx, mut s)| {
189-
let next = match future::select(rx, s.next()).await {
190-
Either::Left(_) => None,
191-
Either::Right((r, rx)) => r.map(|r| (r, rx)),
192-
};
193-
next.map(|(r, rx)| (r, (rx, s)))
194-
});
195-
196-
// Once the stream ends, send the Complete message.
197-
let s = s.chain(
198-
Reaction::ServerMessage(ServerMessage::Complete { id })
199-
.into_stream(),
200-
);
201-
202-
s.boxed()
203-
}
168+
// Create a channel that we can use to cancel the operation.
169+
let (tx, rx) = oneshot::channel::<()>();
170+
stoppers.insert(id.clone(), tx);
171+
172+
// Create the operation stream. This stream will emit Data and Error
173+
// messages, but will not emit Complete – that part is up to us.
174+
let s = Self::start(
175+
id.clone(),
176+
ExecutionParams {
177+
start_payload: payload,
178+
config: config.clone(),
179+
schema: schema.clone(),
180+
},
181+
)
182+
.into_stream()
183+
.flatten();
184+
185+
// Combine this with our oneshot channel so that the stream ends if the
186+
// oneshot is ever fired.
187+
let s = stream::unfold((rx, s.boxed()), async |(rx, mut s)| {
188+
let next = match future::select(rx, s.next()).await {
189+
Either::Left(_) => None,
190+
Either::Right((r, rx)) => r.map(|r| (r, rx)),
191+
};
192+
next.map(|(r, rx)| (r, (rx, s)))
193+
});
194+
195+
// Once the stream ends, send the Complete message.
196+
let s = s.chain(
197+
Reaction::ServerMessage(ServerMessage::Complete { id })
198+
.into_stream(),
199+
);
200+
201+
s.boxed()
204202
}
205203
}
206204
ClientMessage::Stop { id } => {

0 commit comments

Comments
 (0)