diff --git a/pkg/config/event_dispatcher.go b/pkg/config/event_dispatcher.go index ad5ea25b7c..b1697bb61b 100644 --- a/pkg/config/event_dispatcher.go +++ b/pkg/config/event_dispatcher.go @@ -17,9 +17,11 @@ package config import ( "strings" + "sync" ) type EventDispatcher struct { + mut sync.RWMutex registry map[string][]EventHandler keyPrefix []string } @@ -32,10 +34,14 @@ func NewEventDispatcher() *EventDispatcher { } func (ed *EventDispatcher) Get(key string) []EventHandler { + ed.mut.RLock() + defer ed.mut.RUnlock() return ed.registry[formatKey(key)] } func (ed *EventDispatcher) Dispatch(event *Event) { + ed.mut.RLock() + defer ed.mut.RUnlock() var hs []EventHandler realKey := formatKey(event.Key) hs, ok := ed.registry[realKey] @@ -55,6 +61,8 @@ func (ed *EventDispatcher) Dispatch(event *Event) { // register a handler to watch specific config changed func (ed *EventDispatcher) Register(key string, handler EventHandler) { + ed.mut.Lock() + defer ed.mut.Unlock() key = formatKey(key) v, ok := ed.registry[key] if ok { @@ -66,6 +74,8 @@ func (ed *EventDispatcher) Register(key string, handler EventHandler) { // register a handler to watch specific config changed func (ed *EventDispatcher) RegisterForKeyPrefix(keyPrefix string, handler EventHandler) { + ed.mut.Lock() + defer ed.mut.Unlock() keyPrefix = formatKey(keyPrefix) v, ok := ed.registry[keyPrefix] if ok { @@ -77,6 +87,8 @@ func (ed *EventDispatcher) RegisterForKeyPrefix(keyPrefix string, handler EventH } func (ed *EventDispatcher) Unregister(key string, handler EventHandler) { + ed.mut.Lock() + defer ed.mut.Unlock() key = formatKey(key) v, ok := ed.registry[key] if !ok { diff --git a/pkg/config/event_dispatcher_test.go b/pkg/config/event_dispatcher_test.go new file mode 100644 index 0000000000..0b8425a843 --- /dev/null +++ b/pkg/config/event_dispatcher_test.go @@ -0,0 +1,141 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "testing" + + "github.com/samber/lo" + "github.com/stretchr/testify/suite" + "go.uber.org/atomic" +) + +type EventDispatcherSuite struct { + suite.Suite + + dispatcher *EventDispatcher +} + +func (s *EventDispatcherSuite) SetupTest() { + s.dispatcher = NewEventDispatcher() +} + +func (s *EventDispatcherSuite) TestRegister() { + dispatcher := s.dispatcher + + s.Run("test_register_same_key", func() { + dispatcher.Register("a", NewHandler("handler_1", func(*Event) {})) + dispatcher.Register("a", NewHandler("handler_2", func(*Event) {})) + + handlers := dispatcher.Get("a") + s.ElementsMatch([]string{"handler_1", "handler_2"}, lo.Map(handlers, func(h EventHandler, _ int) string { return h.GetIdentifier() })) + }) + + s.Run("test_register_same_id", func() { + dispatcher.Register("b", NewHandler("handler_1", func(*Event) {})) + dispatcher.Register("b", NewHandler("handler_1", func(*Event) {})) + + handlers := dispatcher.Get("b") + s.ElementsMatch([]string{"handler_1", "handler_1"}, lo.Map(handlers, func(h EventHandler, _ int) string { return h.GetIdentifier() })) + }) +} + +func (s *EventDispatcherSuite) TestRegisterForKeyPrefix() { + dispatcher := s.dispatcher + + s.Run("test_register_same_key", func() { + dispatcher.RegisterForKeyPrefix("a", NewHandler("handler_1", func(*Event) {})) + dispatcher.RegisterForKeyPrefix("a", NewHandler("handler_2", func(*Event) {})) + + handlers := dispatcher.Get("a") + s.ElementsMatch([]string{"handler_1", "handler_2"}, lo.Map(handlers, func(h EventHandler, _ int) string { return h.GetIdentifier() })) + s.Contains(dispatcher.keyPrefix, "a") + }) + + s.Run("test_register_same_id", func() { + dispatcher.RegisterForKeyPrefix("b", NewHandler("handler_1", func(*Event) {})) + dispatcher.RegisterForKeyPrefix("b", NewHandler("handler_1", func(*Event) {})) + + handlers := dispatcher.Get("b") + s.ElementsMatch([]string{"handler_1", "handler_1"}, lo.Map(handlers, func(h EventHandler, _ int) string { return h.GetIdentifier() })) + s.Contains(dispatcher.keyPrefix, "b") + }) +} + +func (s *EventDispatcherSuite) TestUnregister() { + dispatcher := s.dispatcher + + s.Run("unregister_non_exist_key", func() { + s.NotPanics(func() { + dispatcher.Unregister("non_register", NewHandler("handler_1", func(*Event) {})) + }) + }) + + s.Run("unregister_non_exist_id", func() { + dispatcher.Register("b", NewHandler("handler_1", func(*Event) {})) + + s.NotPanics(func() { + dispatcher.Unregister("b", NewHandler("handler_2", func(*Event) {})) + }) + + handlers := dispatcher.Get("b") + s.ElementsMatch([]string{"handler_1"}, lo.Map(handlers, func(h EventHandler, _ int) string { return h.GetIdentifier() })) + }) + + s.Run("unregister_exist_handler", func() { + dispatcher.Register("c", NewHandler("handler_1", func(*Event) {})) + + s.NotPanics(func() { + dispatcher.Unregister("c", NewHandler("handler_1", func(*Event) {})) + }) + + handlers := dispatcher.Get("c") + s.ElementsMatch([]string{}, lo.Map(handlers, func(h EventHandler, _ int) string { return h.GetIdentifier() })) + }) +} + +func (s *EventDispatcherSuite) TestDispatch() { + dispatcher := s.dispatcher + + s.Run("dispatch_key_event", func() { + called := atomic.NewBool(false) + + dispatcher.Register("a", NewHandler("handler_1", func(*Event) { called.Store(true) })) + + dispatcher.Dispatch(newEvent("test", "test", "aa", "b")) + + s.False(called.Load()) + + dispatcher.Dispatch(newEvent("test", "test", "a", "b")) + + s.True(called.Load()) + }) + + s.Run("dispatch_prefix_event", func() { + called := atomic.NewBool(false) + + dispatcher.RegisterForKeyPrefix("b", NewHandler("handler_1", func(*Event) { called.Store(true) })) + + dispatcher.Dispatch(newEvent("test", "test", "bb", "b")) + + s.True(called.Load()) + }) +} + +func TestEventDispatcher(t *testing.T) { + suite.Run(t, new(EventDispatcherSuite)) +}