Skip to content

Commit dc5eb0d

Browse files
WyRainBowWyRainBow
andauthored
implement counter by key (#1390)
* implement counter by key * chore: trigger CI * Fix counter initialization errors and mesh change detection logic --------- Co-authored-by: WyRainBow <your-email@example.com>
1 parent 3d33ecd commit dc5eb0d

6 files changed

Lines changed: 279 additions & 57 deletions

File tree

pkg/console/counter/component.go

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ import (
2222
"math"
2323

2424
"github.com/apache/dubbo-admin/pkg/core/events"
25+
"github.com/apache/dubbo-admin/pkg/core/logger"
26+
meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
27+
resmodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
2528
"github.com/apache/dubbo-admin/pkg/core/runtime"
29+
"github.com/apache/dubbo-admin/pkg/core/store"
2630
)
2731

2832
const ComponentType runtime.ComponentType = "counter manager"
@@ -41,7 +45,7 @@ var _ ManagerComponent = &managerComponent{}
4145
func (c *managerComponent) RequiredDependencies() []runtime.ComponentType {
4246
return []runtime.ComponentType{
4347
runtime.ResourceStore,
44-
runtime.EventBus, // Counter depends on EventBus to subscribe to events
48+
runtime.EventBus,
4549
}
4650
}
4751

@@ -64,6 +68,19 @@ func (c *managerComponent) Init(runtime.BuilderContext) error {
6468
}
6569

6670
func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error {
71+
storeComponent, err := rt.GetComponent(runtime.ResourceStore)
72+
if err != nil {
73+
return err
74+
}
75+
storeRouter, ok := storeComponent.(store.Router)
76+
if !ok {
77+
return fmt.Errorf("component %s does not implement store.Router", runtime.ResourceStore)
78+
}
79+
80+
if err := c.initializeCountsFromStore(storeRouter); err != nil {
81+
logger.Warnf("Failed to initialize counter manager from store: %v", err)
82+
}
83+
6784
component, err := rt.GetComponent(runtime.EventBus)
6885
if err != nil {
6986
return err
@@ -75,6 +92,73 @@ func (c *managerComponent) Start(rt runtime.Runtime, _ <-chan struct{}) error {
7592
return c.manager.Bind(bus)
7693
}
7794

95+
func (c *managerComponent) initializeCountsFromStore(storeRouter store.Router) error {
96+
if err := c.initializeResourceCount(storeRouter, meshresource.InstanceKind); err != nil {
97+
return fmt.Errorf("failed to initialize instance count: %w", err)
98+
}
99+
100+
if err := c.initializeResourceCount(storeRouter, meshresource.ApplicationKind); err != nil {
101+
return fmt.Errorf("failed to initialize application count: %w", err)
102+
}
103+
104+
if err := c.initializeResourceCount(storeRouter, meshresource.ServiceProviderMetadataKind); err != nil {
105+
return fmt.Errorf("failed to initialize service provider metadata count: %w", err)
106+
}
107+
108+
return nil
109+
}
110+
111+
func (c *managerComponent) initializeResourceCount(storeRouter store.Router, kind resmodel.ResourceKind) error {
112+
resourceStore, err := storeRouter.ResourceKindRoute(kind)
113+
if err != nil {
114+
return err
115+
}
116+
117+
allResources := resourceStore.List()
118+
cm := c.manager.(*counterManager)
119+
120+
for _, obj := range allResources {
121+
resource, ok := obj.(resmodel.Resource)
122+
if !ok {
123+
continue
124+
}
125+
126+
mesh := resource.ResourceMesh()
127+
if mesh == "" {
128+
mesh = "default"
129+
}
130+
131+
if counter, exists := cm.simpleCounters[kind]; exists {
132+
counter.Increment(mesh)
133+
}
134+
135+
if kind == meshresource.InstanceKind {
136+
instance, ok := resource.(*meshresource.InstanceResource)
137+
if ok && instance.Spec != nil {
138+
protocol := instance.Spec.GetProtocol()
139+
if protocol != "" {
140+
if cfg := cm.getDistributionConfig(kind, ProtocolCounter); cfg != nil {
141+
cfg.counter.Increment(mesh, protocol)
142+
}
143+
}
144+
145+
releaseVersion := instance.Spec.GetReleaseVersion()
146+
if releaseVersion != "" {
147+
if cfg := cm.getDistributionConfig(kind, ReleaseCounter); cfg != nil {
148+
cfg.counter.Increment(mesh, releaseVersion)
149+
}
150+
}
151+
152+
if cfg := cm.getDistributionConfig(kind, DiscoveryCounter); cfg != nil {
153+
cfg.counter.Increment(mesh, mesh)
154+
}
155+
}
156+
}
157+
}
158+
159+
return nil
160+
}
161+
78162
func (c *managerComponent) CounterManager() CounterManager {
79163
return c.manager
80164
}

pkg/console/counter/counter.go

Lines changed: 97 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,79 +19,149 @@ package counter
1919

2020
import (
2121
"sync"
22-
"sync/atomic"
2322
)
2423

2524
type Counter struct {
26-
name string
27-
value atomic.Int64
25+
name string
26+
data map[string]int64
27+
mu sync.RWMutex
2828
}
2929

3030
func NewCounter(name string) *Counter {
31-
return &Counter{name: name}
31+
return &Counter{
32+
name: name,
33+
data: make(map[string]int64),
34+
}
3235
}
3336

3437
func (c *Counter) Get() int64 {
35-
return c.value.Load()
38+
c.mu.RLock()
39+
defer c.mu.RUnlock()
40+
var sum int64
41+
for _, v := range c.data {
42+
sum += v
43+
}
44+
return sum
3645
}
3746

38-
func (c *Counter) Increment() {
39-
c.value.Add(1)
47+
func (c *Counter) GetByGroup(group string) int64 {
48+
if group == "" {
49+
group = "default"
50+
}
51+
c.mu.RLock()
52+
defer c.mu.RUnlock()
53+
return c.data[group]
4054
}
4155

42-
func (c *Counter) Decrement() {
43-
for {
44-
current := c.value.Load()
45-
if current == 0 {
46-
return
47-
}
48-
if c.value.CompareAndSwap(current, current-1) {
49-
return
56+
func (c *Counter) Increment(group string) {
57+
if group == "" {
58+
group = "default"
59+
}
60+
c.mu.Lock()
61+
defer c.mu.Unlock()
62+
c.data[group]++
63+
}
64+
65+
func (c *Counter) Decrement(group string) {
66+
if group == "" {
67+
group = "default"
68+
}
69+
c.mu.Lock()
70+
defer c.mu.Unlock()
71+
if value, ok := c.data[group]; ok {
72+
value--
73+
if value <= 0 {
74+
delete(c.data, group)
75+
} else {
76+
c.data[group] = value
5077
}
5178
}
5279
}
5380

5481
func (c *Counter) Reset() {
55-
c.value.Store(0)
82+
c.mu.Lock()
83+
defer c.mu.Unlock()
84+
c.data = make(map[string]int64)
5685
}
5786

5887
type DistributionCounter struct {
5988
name string
60-
data map[string]int64
89+
data map[string]map[string]int64
6190
mu sync.RWMutex
6291
}
6392

6493
func NewDistributionCounter(name string) *DistributionCounter {
6594
return &DistributionCounter{
6695
name: name,
67-
data: make(map[string]int64),
96+
data: make(map[string]map[string]int64),
6897
}
6998
}
7099

71-
func (c *DistributionCounter) Increment(key string) {
100+
func (c *DistributionCounter) Increment(group, key string) {
101+
if group == "" {
102+
group = "default"
103+
}
104+
if key == "" {
105+
key = "unknown"
106+
}
72107
c.mu.Lock()
73108
defer c.mu.Unlock()
74-
c.data[key]++
109+
if c.data[group] == nil {
110+
c.data[group] = make(map[string]int64)
111+
}
112+
c.data[group][key]++
75113
}
76114

77-
func (c *DistributionCounter) Decrement(key string) {
115+
func (c *DistributionCounter) Decrement(group, key string) {
116+
if group == "" {
117+
group = "default"
118+
}
119+
if key == "" {
120+
key = "unknown"
121+
}
78122
c.mu.Lock()
79123
defer c.mu.Unlock()
80-
if value, ok := c.data[key]; ok {
124+
groupData, exists := c.data[group]
125+
if !exists {
126+
return
127+
}
128+
if value, ok := groupData[key]; ok {
81129
value--
82130
if value <= 0 {
83-
delete(c.data, key)
131+
delete(groupData, key)
132+
if len(groupData) == 0 {
133+
delete(c.data, group)
134+
}
84135
} else {
85-
c.data[key] = value
136+
groupData[key] = value
86137
}
87138
}
88139
}
89140

90141
func (c *DistributionCounter) GetAll() map[string]int64 {
91142
c.mu.RLock()
92143
defer c.mu.RUnlock()
93-
result := make(map[string]int64, len(c.data))
94-
for k, v := range c.data {
144+
result := make(map[string]int64)
145+
for _, groupData := range c.data {
146+
for k, v := range groupData {
147+
result[k] += v
148+
}
149+
}
150+
return result
151+
}
152+
153+
func (c *DistributionCounter) GetByGroup(group string) map[string]int64 {
154+
if group == "" {
155+
group = "default"
156+
}
157+
c.mu.RLock()
158+
defer c.mu.RUnlock()
159+
groupData, exists := c.data[group]
160+
if !exists {
161+
return map[string]int64{}
162+
}
163+
result := make(map[string]int64, len(groupData))
164+
for k, v := range groupData {
95165
result[k] = v
96166
}
97167
return result
@@ -100,5 +170,5 @@ func (c *DistributionCounter) GetAll() map[string]int64 {
100170
func (c *DistributionCounter) Reset() {
101171
c.mu.Lock()
102172
defer c.mu.Unlock()
103-
c.data = make(map[string]int64)
173+
c.data = make(map[string]map[string]int64)
104174
}

0 commit comments

Comments
 (0)