Skip to content

Commit a783f3c

Browse files
committed
feat(eventbus): implement async dispatch core runtime path
1 parent b809b14 commit a783f3c

12 files changed

Lines changed: 214 additions & 20 deletions

File tree

pkg/config/app/admin.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/apache/dubbo-admin/pkg/config/diagnostics"
2828
"github.com/apache/dubbo-admin/pkg/config/discovery"
2929
"github.com/apache/dubbo-admin/pkg/config/engine"
30+
"github.com/apache/dubbo-admin/pkg/config/eventbus"
3031
"github.com/apache/dubbo-admin/pkg/config/log"
3132
"github.com/apache/dubbo-admin/pkg/config/observability"
3233
"github.com/apache/dubbo-admin/pkg/config/store"
@@ -48,18 +49,22 @@ type AdminConfig struct {
4849
Discovery []*discovery.Config `json:"discovery" yaml:"discovery"`
4950
// Engine configuration
5051
Engine *engine.Config `json:"engine" yaml:"engine"`
52+
// EventBus configuration
53+
EventBus *eventbus.Config `json:"eventBus,omitempty" yaml:"eventBus,omitempty"`
5154
}
5255

5356
var _ = &AdminConfig{}
5457

5558
var DefaultAdminConfig = func() AdminConfig {
59+
eventBusCfg := eventbus.Default()
5660
return AdminConfig{
5761
Log: log.DefaultLogConfig(),
5862
Store: store.DefaultStoreConfig(),
5963
Engine: engine.DefaultResourceEngineConfig(),
6064
Observability: observability.DefaultObservabilityConfig(),
6165
Diagnostics: diagnostics.DefaultDiagnosticsConfig(),
6266
Console: console.DefaultConsoleConfig(),
67+
EventBus: &eventBusCfg,
6368
}
6469
}
6570

@@ -160,6 +165,12 @@ func (c AdminConfig) Validate() error {
160165
} else if err := c.Engine.Validate(); err != nil {
161166
return bizerror.Wrap(err, bizerror.ConfigError, "engine config validation failed")
162167
}
168+
if c.EventBus == nil {
169+
cfg := eventbus.Default()
170+
c.EventBus = &cfg
171+
} else if err := c.EventBus.Validate(); err != nil {
172+
return bizerror.Wrap(err, bizerror.ConfigError, "event bus config validation failed")
173+
}
163174
return nil
164175
}
165176

pkg/core/discovery/subscriber/instance.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ func (s *InstanceEventSubscriber) Name() string {
5555
return "Discovery-" + s.ResourceKind().ToString()
5656
}
5757

58+
func (s *InstanceEventSubscriber) AsyncEnabled() bool {
59+
return true
60+
}
61+
5862
func (s *InstanceEventSubscriber) ProcessEvent(event events.Event) error {
5963
newObj, ok := event.NewObj().(*meshresource.InstanceResource)
6064
if !ok && event.NewObj() != nil {

pkg/core/discovery/subscriber/nacos_service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ func (n *NacosServiceEventSubscriber) Name() string {
5757
return "Nacos2Discovery-" + n.ResourceKind().ToString()
5858
}
5959

60+
func (n *NacosServiceEventSubscriber) AsyncEnabled() bool {
61+
return true
62+
}
63+
6064
func (n *NacosServiceEventSubscriber) ProcessEvent(event events.Event) error {
6165
newObj, ok := event.NewObj().(*meshresource.NacosServiceResource)
6266
if !ok && event.NewObj() != nil {

pkg/core/discovery/subscriber/rpc_instance.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ func (s *RPCInstanceEventSubscriber) ResourceKind() coremodel.ResourceKind {
6262
return meshresource.RPCInstanceKind
6363
}
6464

65+
func (s *RPCInstanceEventSubscriber) AsyncEnabled() bool {
66+
return true
67+
}
68+
6569
func (s *RPCInstanceEventSubscriber) ProcessEvent(event events.Event) error {
6670
newObj, ok := event.NewObj().(*meshresource.RPCInstanceResource)
6771
if !ok && event.NewObj() != nil {

pkg/core/discovery/subscriber/service_consumer_metadata.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (s *ServiceConsumerMetadataEventSubscriber) Name() string {
5353
return "Discovery-" + s.ResourceKind().ToString()
5454
}
5555

56+
func (s *ServiceConsumerMetadataEventSubscriber) AsyncEnabled() bool {
57+
return true
58+
}
59+
5660
func (s *ServiceConsumerMetadataEventSubscriber) ProcessEvent(event events.Event) error {
5761
newObj, ok := event.NewObj().(*meshresource.ServiceConsumerMetadataResource)
5862
if !ok && event.NewObj() != nil {

pkg/core/discovery/subscriber/service_provider_metadata.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (s *ServiceProviderMetadataEventSubscriber) Name() string {
5353
return "Discovery-" + s.ResourceKind().ToString()
5454
}
5555

56+
func (s *ServiceProviderMetadataEventSubscriber) AsyncEnabled() bool {
57+
return true
58+
}
59+
5660
func (s *ServiceProviderMetadataEventSubscriber) ProcessEvent(event events.Event) error {
5761
newObj, ok := event.NewObj().(*meshresource.ServiceProviderMetadataResource)
5862
if !ok && event.NewObj() != nil {

pkg/core/discovery/subscriber/zk_config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (z *ZKConfigEventSubscriber) Name() string {
5353
return "Discovery-" + z.ResourceKind().ToString()
5454
}
5555

56+
func (z *ZKConfigEventSubscriber) AsyncEnabled() bool {
57+
return true
58+
}
59+
5660
func (z *ZKConfigEventSubscriber) ProcessEvent(event events.Event) error {
5761
newObj, ok := event.NewObj().(*meshresource.ZKConfigResource)
5862
if !ok && event.NewObj() != nil {

pkg/core/discovery/subscriber/zk_metadata.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ func (z *ZKMetadataEventSubscriber) Name() string {
5353
return "Discovery-" + z.ResourceKind().ToString()
5454
}
5555

56+
func (z *ZKMetadataEventSubscriber) AsyncEnabled() bool {
57+
return true
58+
}
59+
5660
func (z *ZKMetadataEventSubscriber) ProcessEvent(event events.Event) error {
5761
newObj, ok := event.NewObj().(*meshresource.ZKMetadataResource)
5862
if !ok && event.NewObj() != nil {

pkg/core/engine/subscriber/runtime_instance.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ func (s *RuntimeInstanceEventSubscriber) Name() string {
5656
return "Engine-" + s.ResourceKind().ToString()
5757
}
5858

59+
func (s *RuntimeInstanceEventSubscriber) AsyncEnabled() bool {
60+
return true
61+
}
62+
5963
func (s *RuntimeInstanceEventSubscriber) ProcessEvent(event events.Event) error {
6064
newObj, ok := event.NewObj().(*meshresource.RuntimeInstanceResource)
6165
if !ok && event.NewObj() != nil {

pkg/core/events/async.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package events
19+
20+
import "sync/atomic"
21+
22+
// AsyncSubscriber is an optional interface.
23+
// Subscribers implementing this interface can be dispatched asynchronously.
24+
type AsyncSubscriber interface {
25+
Subscriber
26+
AsyncEnabled() bool
27+
}
28+
29+
type subscriberState struct {
30+
subscriber Subscriber
31+
async bool
32+
ch chan Event
33+
done chan struct{}
34+
closed atomic.Bool
35+
drainerStarted atomic.Bool
36+
}

0 commit comments

Comments
 (0)