Skip to content

Commit d8cf428

Browse files
committed
feat(eventbus): support per-subscriber async dispatch with graceful drain
1 parent d9ebf33 commit d8cf428

13 files changed

Lines changed: 221 additions & 22 deletions

File tree

pkg/config/app/admin.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/apache/dubbo-admin/pkg/config/diagnostics"
2828
"github.com/apache/dubbo-admin/pkg/config/discovery"
2929
"github.com/apache/dubbo-admin/pkg/config/engine"
30+
"github.com/apache/dubbo-admin/pkg/config/eventbus"
3031
"github.com/apache/dubbo-admin/pkg/config/log"
3132
"github.com/apache/dubbo-admin/pkg/config/observability"
3233
"github.com/apache/dubbo-admin/pkg/config/store"
@@ -48,18 +49,22 @@ type AdminConfig struct {
4849
Discovery []*discovery.Config `json:"discovery" yaml:"discovery"`
4950
// Engine configuration
5051
Engine *engine.Config `json:"engine" yaml:"engine"`
52+
// EventBus configuration
53+
EventBus *eventbus.Config `json:"eventBus,omitempty" yaml:"eventBus,omitempty"`
5154
}
5255

5356
var _ = &AdminConfig{}
5457

5558
var DefaultAdminConfig = func() AdminConfig {
59+
eventBusCfg := eventbus.Default()
5660
return AdminConfig{
5761
Log: log.DefaultLogConfig(),
5862
Store: store.DefaultStoreConfig(),
5963
Engine: engine.DefaultResourceEngineConfig(),
6064
Observability: observability.DefaultObservabilityConfig(),
6165
Diagnostics: diagnostics.DefaultDiagnosticsConfig(),
6266
Console: console.DefaultConsoleConfig(),
67+
EventBus: &eventBusCfg,
6368
}
6469
}
6570

@@ -115,7 +120,7 @@ func (c AdminConfig) PostProcess() error {
115120
)
116121
}
117122

118-
func (c AdminConfig) Validate() error {
123+
func (c *AdminConfig) Validate() error {
119124
if c.Log == nil {
120125
c.Log = log.DefaultLogConfig()
121126
} else if err := c.Log.Validate(); err != nil {
@@ -160,6 +165,12 @@ func (c AdminConfig) Validate() error {
160165
} else if err := c.Engine.Validate(); err != nil {
161166
return bizerror.Wrap(err, bizerror.ConfigError, "engine config validation failed")
162167
}
168+
if c.EventBus == nil {
169+
cfg := eventbus.Default()
170+
c.EventBus = &cfg
171+
} else if err := c.EventBus.Validate(); err != nil {
172+
return bizerror.Wrap(err, bizerror.ConfigError, "event bus config validation failed")
173+
}
163174
return nil
164175
}
165176

pkg/config/eventbus/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,6 @@ func (c Config) Validate() error {
2929

3030
func Default() Config {
3131
return Config{
32-
BufferSize: 100,
32+
BufferSize: 1024,
3333
}
3434
}

pkg/core/discovery/subscriber/instance.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ func (s *InstanceEventSubscriber) Name() string {
5555
return "Discovery-" + s.ResourceKind().ToString()
5656
}
5757

58+
func (s *InstanceEventSubscriber) AsyncEnabled() bool {
59+
return true
60+
}
61+
5862
func (s *InstanceEventSubscriber) ProcessEvent(event events.Event) error {
5963
newObj, ok := event.NewObj().(*meshresource.InstanceResource)
6064
if !ok && event.NewObj() != nil {

pkg/core/discovery/subscriber/nacos_service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ func (n *NacosServiceEventSubscriber) Name() string {
5757
return "Nacos2Discovery-" + n.ResourceKind().ToString()
5858
}
5959

60+
func (n *NacosServiceEventSubscriber) AsyncEnabled() bool {
61+
return true
62+
}
63+
6064
func (n *NacosServiceEventSubscriber) ProcessEvent(event events.Event) error {
6165
newObj, ok := event.NewObj().(*meshresource.NacosServiceResource)
6266
if !ok && event.NewObj() != nil {

pkg/core/discovery/subscriber/rpc_instance.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ func (s *RPCInstanceEventSubscriber) ResourceKind() coremodel.ResourceKind {
6363
return meshresource.RPCInstanceKind
6464
}
6565

66+
func (s *RPCInstanceEventSubscriber) AsyncEnabled() bool {
67+
return true
68+
}
69+
6670
func (s *RPCInstanceEventSubscriber) ProcessEvent(event events.Event) error {
6771
newObj, ok := event.NewObj().(*meshresource.RPCInstanceResource)
6872
if !ok && event.NewObj() != nil {

pkg/core/discovery/subscriber/service_consumer_metadata.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (s *ServiceConsumerMetadataEventSubscriber) Name() string {
5353
return "Discovery-" + s.ResourceKind().ToString()
5454
}
5555

56+
func (s *ServiceConsumerMetadataEventSubscriber) AsyncEnabled() bool {
57+
return true
58+
}
59+
5660
func (s *ServiceConsumerMetadataEventSubscriber) ProcessEvent(event events.Event) error {
5761
newObj, ok := event.NewObj().(*meshresource.ServiceConsumerMetadataResource)
5862
if !ok && event.NewObj() != nil {

pkg/core/discovery/subscriber/service_provider_metadata.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (s *ServiceProviderMetadataEventSubscriber) Name() string {
5353
return "Discovery-" + s.ResourceKind().ToString()
5454
}
5555

56+
func (s *ServiceProviderMetadataEventSubscriber) AsyncEnabled() bool {
57+
return true
58+
}
59+
5660
func (s *ServiceProviderMetadataEventSubscriber) ProcessEvent(event events.Event) error {
5761
newObj, ok := event.NewObj().(*meshresource.ServiceProviderMetadataResource)
5862
if !ok && event.NewObj() != nil {

pkg/core/discovery/subscriber/zk_config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (z *ZKConfigEventSubscriber) Name() string {
5353
return "Discovery-" + z.ResourceKind().ToString()
5454
}
5555

56+
func (z *ZKConfigEventSubscriber) AsyncEnabled() bool {
57+
return true
58+
}
59+
5660
func (z *ZKConfigEventSubscriber) ProcessEvent(event events.Event) error {
5761
newObj, ok := event.NewObj().(*meshresource.ZKConfigResource)
5862
if !ok && event.NewObj() != nil {

pkg/core/discovery/subscriber/zk_metadata.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (z *ZKMetadataEventSubscriber) Name() string {
5353
return "Discovery-" + z.ResourceKind().ToString()
5454
}
5555

56+
func (z *ZKMetadataEventSubscriber) AsyncEnabled() bool {
57+
return true
58+
}
59+
5660
func (z *ZKMetadataEventSubscriber) ProcessEvent(event events.Event) error {
5761
newObj, ok := event.NewObj().(*meshresource.ZKMetadataResource)
5862
if !ok && event.NewObj() != nil {

pkg/core/engine/subscriber/runtime_instance.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ func (s *RuntimeInstanceEventSubscriber) Name() string {
5656
return "Engine-" + s.ResourceKind().ToString()
5757
}
5858

59+
func (s *RuntimeInstanceEventSubscriber) AsyncEnabled() bool {
60+
return true
61+
}
62+
5963
func (s *RuntimeInstanceEventSubscriber) ProcessEvent(event events.Event) error {
6064
newObj, ok := event.NewObj().(*meshresource.RuntimeInstanceResource)
6165
if !ok && event.NewObj() != nil {

0 commit comments

Comments
 (0)