Mocking Redis and Kafka in Go
During the work I've done building a rules engine I have come at the project with a TDD approach. This means everything that can be mocked must be mocked. When I recently moved to multi-instance deployments, I needed to implement Kafka in order to process events across N number of these instances. I couldn't find a good Kafka mocking library, so after some struggle I ended building a simple and solid implementation. Through these learnings, I also built the same for Redis.
Different Types of Mocking
In the program I make use of two different types of objects which require a different approach to mocking.
Provider
The first is a provider approach. There is a single object throughout the application, and this object has a provider. The provider then gets set depending on the environment. This is useful when you are testing against third-party providers - I have done this for a stock data source, Stripe and an email provider.
The file structure for this type of mocking looks as follows:
There's a lot of files, but this gives you the flexibility you need. Below we have a main data struct, with a different provider set depending on the environment:
const (
DataProviderOne = "third_party_production_one"
DataProviderTest = "test_data_provider"
)
type DataSource struct {
Provider DataProviderService
Ctx context.Context
}
func (d *DataSource) setRepo(env string, errState bool) {
if env == "test" {
repo.SetErrorState(errState)
}
}
func (d *DataSource) setProvider(provider string, state bool) (err error) {
if provider != DataProviderOne && provider != DataProviderTest {
return fmt.Errorf("could not set data provider. Must be one of %s or %s, you provided %s", DataProviderOne, DataProviderTest, provider)
}
d.Provider.setClient(provider, state)
return
}
When the application begins, the environment is set depending on whether tests are being run or not. Throughout the application, the provider is the same.
The reason this is done is so that the underlying provider can be swapped out or set depending on which source you need. For example, you can have several data providers in the application each pointing to a different third party, e.g. if you need to pull data from Yahoo and Google and AnotherProvider.
The underlying data provider looks as follows:
type DataProvider interface {
KeyStats(context context.Context, stock string) (keyStats thirdPartyLib.KeyStats, err error)
HistoricalPrices(ctx context.Context, symbol string, timeframe thirdPartyLib.HistoricalTimeFrame, options *thirdPartyLib.HistoricalOptions) (data []thirdPartyLib.HistoricalDataPoint, err error)
InsiderTransactions(ctx context.Context, symbol string) (insiderTransactions []thirdPartyLib.InsiderTransaction, err error)
AdvancedStats(ctx context.Context, symbol string) (companyStats thirdPartyLib.AdvancedStats, err error)
Company(ctx context.Context, symbol string) (company thirdPartyLib.Company, err error)
}
type DataProviderService struct {
API DataProvider
}
func (dp *DataProviderService) setClient(provider string, state bool) {
switch provider {
case DataProviderOne:
dp.API = thirdPartyLib.NewClient(os.Getenv("PRODUCTION_API_KEY"), thirdPartyLib.WithBaseURL(BaseURL))
case DataProviderTest:
mock := &MockData{}
mock.setError(state)
dp.API = mock
}
}
Mocking makes heavy use of interfaces. In the above, MockData
needs to conform to the DataProvider
interface by having the same functions with the same signatures. I use error states in all mocks to simulate failures at different levels of the application.
Another approach is to have several providers set under DataProviderOne
as some "production" providers. Here you can instantiate your Yahoo and Google services, and the functions can then call the relevant services for specific data.
Direct
The second approach could be called a direct approach. This is when the top-level object is either the production or the mock instance. I use this when I don't need to have several providers and the associated flexibility.
This is what the file structure looks like:
And this is what the object and interface looks like:
type CacheClient struct {
cache *redis.Client
}
type CacheClientInterface interface {
setRepo(env string, errState bool) (err error)
loadRedis(namespace, host, port string, password string, db int) (err error)
GetErrorState() bool
setHostKey()
HostKey() string
WaitForHostKeyExpiration()
IsLeader() (err error)
GetCache(key string, result interface{}) (err error)
DeleteCache(key string) (err error)
Cache(key string, obj interface{}) (err error)
CacheTTL(key string, obj interface{}, ttl time.Duration) (err error)
NewCache() (err error)
ClearCache() (err error)
}
Here, the cache
from CacheClient
is only set in production, and the mock has different fields only applicable to testing.
Mocking Kafka
When I initially wanted to mock the Kafka service, I decided to use the provider approach. This would mean creating an interface that conformed to the Kafka library I was using. I would have a top-level object, with a service provider. That service provider in production would be the Kafka library, and in testing it would be the mock library. However, the third party library seemed too complicated - it would return *redis.Cmd
and I had to try and figure out what that is and how to fit it to my needs.
I went back to the drawing board and decided to take the direct approach which turned out to be far easier, and the better approach overall.
The first approach I took to mocking Kafka was to use slices. The slice would keep the messages that were received, and then would "read" them by returning the given message and removing it from the slice. This didn't work out well due to Kafka's nature of being async - I think the decoupling of the readers and writers had a part to play in the complexity. After spending some time on this I decided to use a better, more direct approach that is native to Go - channels.
The implementation of this would be simple:
- Receive a message through the
Write
method - Add a message to the channel
- Increase the
WaitGroup
count - Have another
Read
method running continuosly listening to the channel - Process the message
- Reduce the
WaitGroup
count
On program initialisation, if tests are running the setup function is called which sets up the object:
func (m *MockMessagingService) Setup() {
m.messages = &MockMessageSlice{
Messages: []*kafka.Message{},
mutex: &sync.RWMutex{},
wg: &sync.WaitGroup{},
}
go messaging.startReaderRunner()
}
The writer works as follows:
func (m *MockMessagingService) WriteMessages(topic string, contents []string) (err error) {
for _, contents := range contents {
m.messages.AddMessage(kafka.Message{
Key: []byte(uuid.New().String()),
Value: []byte(contents),
Topic: topic,
})
}
Info.Printf("Mock: Wrote %d messages to topic %s", len(contents), topic)
return
}
func (ms *MockMessageSlice) AddMessage(message kafka.Message) {
if message.Key == nil {
return
}
wg.Add(1)
messageChannel <- message
}
And the reader processes messages as long as the program is running through the startReaderRunner
method:
func (m *MockMessagingService) startReaderRunner() {
for message := range messageChannel {
mockMutex.Lock()
defer mockMutex.Unlock()
go m.processMessageFromReader(message)
}
}
When messages are finished reading from the processMessageFromReader
function, the WaitGroup
is decremented.
This works really well. There are two considerations that tripped me up:
- If the
processMessageFromReader
call results in another Kafka message being written and processed, and is dependent on that process to complete, you will sit and wait forever in this loop. I added this into ago routine
to avoid stuck processing. wg.Wait()
needs to be called after each test that uses Kafka. This is not ideal as you might not know which functions require it, but here we are.
Mocking Redis
As Redis is effectively a key-value store, you can use a map to keep these values in memory:
var MockCacheMap = make(map[string]string)
The key to using maps or slices successfully as a cache is a mutex
. I use the mutex anytime any value is changed or accessed. The second part is key: even if a value (any value, not just a map or slice) is accessed, you need to use the mutex.
Here is an example of setting and getting the error state:
func (c *CacheMockClient) setRepo(env string, errState bool) (err error) {
MockCacheMutex.Lock()
defer MockCacheMutex.Unlock()
c.ErrState = errState
return nil
}
func (c *CacheMockClient) GetErrorState() bool {
MockCacheMutex.Lock()
defer MockCacheMutex.Unlock()
return c.ErrState
}
The following functions are the key functions for using this mock as Redis:
func (c *CacheMockClient) GetCache(key string, result interface{}) (err error) {
if c.GetErrorState() {
return ErrCacheMock
}
mockCacheMapMutex.Lock()
defer mockCacheMapMutex.Unlock()
if val, ok := MockCacheMap[key]; ok {
err = json.Unmarshal([]byte(val), result)
if err != nil {
return err
}
}
return nil
}
func (c *CacheMockClient) DeleteCache(key string) (err error) {
if c.GetErrorState() {
return ErrCacheMock
}
mockCacheMapMutex.Lock()
defer mockCacheMapMutex.Unlock()
delete(MockCacheMap, key)
return nil
}
func (c *CacheMockClient) ClearCache() (err error) {
if c.GetErrorState() {
return ErrCacheMock
}
mockCacheMapMutex.Lock()
defer mockCacheMapMutex.Unlock()
MockCacheMap = make(map[string]string)
return nil
}
func (c *CacheMockClient) Cache(key string, obj interface{}) (err error) {
if c.GetErrorState() {
return ErrCacheMock
}
mockCacheMapMutex.Lock()
defer mockCacheMapMutex.Unlock()
val, err := json.Marshal(obj)
if err != nil {
return err
}
MockCacheMap[key] = string(val)
return nil
}
I added in a ClearCache
function for testing purposes - to see when data is unavailable, and to test origin-only data on demand.
Note: the map and mutex should be a part of the struct (as is in the Kafka client), and not global variables. This makes no difference in the running of the program, but is not good practice.
Conclusion
Mocking is a key part of any solid testing suite. When I first wanted to mock these services I couldn't find much online, and a lot of what I found didn't work for my use case. Hopefully this has helped in some way on your mocking journey.