Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class ConnectionFactory : IConnectionFactory
private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;

private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
private IRedeliveryPolicy redeliveryPolicy = new DefaultRedeliveryPolicy();
private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
private ICompressionPolicy compressionPolicy = new CompressionPolicy();

Expand Down
11 changes: 11 additions & 0 deletions src/DefaultRedeliveryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Apache.NMS.Policies;

namespace Apache.NMS.ActiveMQ;

public class DefaultRedeliveryPolicy : RedeliveryPolicy
{
public override int GetOutcome(IDestination destination)
{
return (int) AckType.PoisonAck;
}
}
115 changes: 80 additions & 35 deletions src/MessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class MessageConsumer : IMessageConsumer, IDispatcher
private ThreadPoolExecutor executor;

private event MessageListener listener;
private event AsyncMessageListener asyncListener;

private IRedeliveryPolicy redeliveryPolicy;
private PreviouslyDeliveredMap previouslyDeliveredMessages;
Expand Down Expand Up @@ -315,6 +316,38 @@ public ConsumerTransformerDelegate ConsumerTransformer
public string MessageSelector => info?.Selector;

public event MessageListener Listener
{
add
{
CheckClosed();

if(PrefetchSize == 0)
{
throw new NMSException("Cannot set Asynchronous Listener on a Consumer with a zero Prefetch size");
}

var wasStarted = session.Started;

if (wasStarted)
{
session.Stop();
}

listener += value;
session.Redispatch(this, unconsumedMessages);

if (wasStarted)
{
session.Start();
}
}
remove
{
listener -= value;
}
}

public event AsyncMessageListener AsyncListener
{
add
{
Expand All @@ -324,23 +357,26 @@ public event MessageListener Listener
{
throw new NMSException("Cannot set Asynchronous Listener on a Consumer with a zero Prefetch size");
}

bool wasStarted = session.Started;

bool wasStarted = this.session.Started;

if(wasStarted)
if (wasStarted)
{
this.session.Stop();
session.Stop();
}

listener += value;
this.session.Redispatch(this, this.unconsumedMessages);
asyncListener += value;
session.Redispatch(this, unconsumedMessages);

if(wasStarted)
if (wasStarted)
{
this.session.Start();
session.Start();
}
}
remove { listener -= value; }
remove
{
asyncListener -= value;
}
}

public IMessage Receive()
Expand Down Expand Up @@ -820,25 +856,27 @@ private void ClearDeliveredList()

public virtual async Task Dispatch_Async(MessageDispatch dispatch)
{
MessageListener listener = this.listener;
var listener = this.listener;
var asyncListener = this.asyncListener;
bool dispatchMessage = false;

try
{
ClearMessagesInProgress();
ClearDeliveredList();

using(await this.unconsumedMessages.SyncRoot.LockAsync().Await())
using (await unconsumedMessages.SyncRoot.LockAsync().Await())
{
if(!this.unconsumedMessages.Closed)
{
if(this.info.Browser || !session.Connection.IsDuplicate(this, dispatch.Message))
{
if(listener != null && this.unconsumedMessages.Running)
if ((listener != null || asyncListener != null) && this.unconsumedMessages.Running)
{
if (RedeliveryExceeded(dispatch))
{
await PosionAckAsync(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries).Await();
var ackType = (AckType) redeliveryPolicy.GetOutcome(dispatch.Destination);
await PoisonAckAsync(dispatch, ackType, $"dispatch to {ConsumerId} exceeds redelivery policy limit:{redeliveryPolicy.MaximumRedeliveries}").Await();
return;
}
else
Expand Down Expand Up @@ -883,9 +921,8 @@ public virtual async Task Dispatch_Async(MessageDispatch dispatch)
}
else
{
Tracer.WarnFormat("Consumer[{0}] suppressing duplicate delivery on connection, poison acking: ({1})",
ConsumerId, dispatch);
await PosionAckAsync(dispatch, "Suppressing duplicate delivery on connection, consumer " + ConsumerId).Await();
Tracer.WarnFormat("Consumer[{0}] suppressing duplicate delivery on connection, poison acking: ({1})", ConsumerId, dispatch);
await PoisonAckAsync(dispatch, AckType.PoisonAck, $"Suppressing duplicate delivery on connection, consumer {ConsumerId}").Await();
}
}
}
Expand All @@ -899,11 +936,15 @@ public virtual async Task Dispatch_Async(MessageDispatch dispatch)

try
{
bool expired = (!IgnoreExpiration && message.IsExpired());
bool expired = !IgnoreExpiration && message.IsExpired();

if(!expired)
{
listener(message);
listener?.Invoke(message);
if (asyncListener != null)
{
await asyncListener.Invoke(message, CancellationToken.None).Await();
}
}

await this.AfterMessageIsConsumedAsync(dispatch, expired).Await();
Expand Down Expand Up @@ -1090,10 +1131,9 @@ private async Task<MessageDispatch> DequeueAsync(TimeSpan timeout)
}
else if (RedeliveryExceeded(dispatch))
{
Tracer.DebugFormat("Consumer[{0}] received with excessive redelivered: {1}",
ConsumerId, dispatch);
await PosionAckAsync(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery " +
"policy limit:" + redeliveryPolicy.MaximumRedeliveries).Await();
Tracer.DebugFormat("Consumer[{0}] received with excessive redelivered: {1}", ConsumerId, dispatch);
var ackType = (AckType) redeliveryPolicy.GetOutcome(dispatch.Destination);
await PoisonAckAsync(dispatch, ackType, $"dispatch to {ConsumerId} exceeds redelivery policy limit:{redeliveryPolicy.MaximumRedeliveries}").Await();

// Refresh the dispatch time
dispatchTime = DateTime.Now;
Expand Down Expand Up @@ -1349,16 +1389,20 @@ private async System.Threading.Tasks.Task ImmediateIndividualTransactedAckAsync(
await this.session.Connection.SyncRequestAsync(ack).Await();
}

private async Task PosionAckAsync(MessageDispatch dispatch, string cause)
private async Task PoisonAckAsync(MessageDispatch dispatch, AckType ackType, string cause)
{
BrokerError poisonCause = new BrokerError();
poisonCause.ExceptionClass = "javax.jms.JMSException";
poisonCause.Message = cause;
BrokerError poisonCause = new BrokerError
{
ExceptionClass = "javax.jms.JMSException",
Message = cause
};

MessageAck posionAck = new MessageAck(dispatch, (byte) AckType.PoisonAck, 1);
posionAck.FirstMessageId = dispatch.Message.MessageId;
posionAck.PoisonCause = poisonCause;
await this.session.SendAckAsync(posionAck).Await();
var poisonAck = new MessageAck(dispatch, (byte) ackType, 1)
{
FirstMessageId = dispatch.Message.MessageId,
PoisonCause = poisonCause
};
await this.session.SendAckAsync(poisonAck).Await();
}

private void RegisterSync()
Expand Down Expand Up @@ -1491,7 +1535,8 @@ internal async Task RollbackAsync()
lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
{
// We need to NACK the messages so that they get sent to the DLQ.
MessageAck ack = new MessageAck(lastMd, (byte) AckType.PoisonAck, deliveredMessages.Count);
var ackType = redeliveryPolicy.GetOutcome(this.info.Destination);
MessageAck ack = new MessageAck(lastMd, (byte) ackType, deliveredMessages.Count);

Tracer.DebugFormat("Consumer[{0}] Poison Ack of {1} messages aft max redeliveries: {2}",
ConsumerId, this.deliveredMessages.Count, this.redeliveryPolicy.MaximumRedeliveries);
Expand Down Expand Up @@ -1585,7 +1630,7 @@ internal async Task RollbackAsync()

// Only redispatch if there's an async listener otherwise a synchronous
// consumer will pull them from the local queue.
if(this.listener != null)
if (HasMessageListener())
{
this.session.Redispatch(this, this.unconsumedMessages);
}
Expand Down Expand Up @@ -1674,15 +1719,15 @@ private void CheckClosed()

private void CheckMessageListener()
{
if(this.listener != null)
if(HasMessageListener())
{
throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero");
}
}

internal bool HasMessageListener()
private bool HasMessageListener()
{
return this.listener != null;
return listener != null || asyncListener != null;
}

protected bool IsAutoAcknowledgeEach
Expand Down
6 changes: 6 additions & 0 deletions src/NmsConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,11 @@ event MessageListener INMSConsumer.Listener
add => ((IMessageConsumer)consumer).Listener += value;
remove => ((IMessageConsumer)consumer).Listener -= value;
}

event AsyncMessageListener INMSConsumer.AsyncListener
{
add => ((IMessageConsumer) consumer).AsyncListener += value;
remove => ((IMessageConsumer) consumer).AsyncListener -= value;
}
}
}
2 changes: 1 addition & 1 deletion src/nms-openwire.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Apache.NMS" Version="2.0.0" />
<PackageReference Include="Apache.NMS" Version="2.2.0" />
<PackageReference Include="SharpZipLib" Version="1.3.3" />
</ItemGroup>

Expand Down
Loading
Loading