Sunday, 27 September 2015

Synchronizing Redis local caches for distributed multi-subscriber scenarios using C#

Redis is a great technology for delivering improved performance across clients or servers, by using today's low prices of memory to store information in RAM to avoid multiple requests after the same data, resulting in unecessary calls to the database and other resources with far slower I/O than Redis can deliver. In this article, I will present code that will provide a RedisMemoryProvider that works across multiple clients. The clients can be anything, a node in a NLB cluster running WCF services, a Windows Forms Client, a WPF client and so on. Redis will give better performance, but how do you keep the clients in sync? In other words, if there are multiple writers, how do the readers know how to update? One strategy could be pull based, i.e. cache data that we know do not change so often and pull fresh data in a given interval (say one time an hour). But a push-based strategy is better. Luckily Redis provides the functionality to do publish-subscriber patterns. Redis can inform when a writer or client did a change such that the other readers or clients should do an update. But while Redis provide all this functionality, writing a wrapper or provider that is sophisticated enough to get this job done is a task that will need to be solved before using Redis. This article will present code that provides this. There are many design choices how a Redis wrapper/provider can support this. I have done the following choices:
  1. The code should use C# and Redis.
  2. The C# Redis provider used is ServiceStack.Redis
  3. The cache invalidation across clients should inform multiple subscribers when a change has been made and cache is invalidated
  4. A local in-memory cache should be kept for performance reasons
  5. The wrapper/provider should support a full CRUD scenario and support convenient methods such as InsertRange() and ClearAll().
  6. It should be easy to point the code to another Redis server.
  7. Multiple Readers scenario should be supported using Redis PubSub functionality
The following code is the wrapper that provides working against a Redis cache:

using Newtonsoft.Json;
using ServiceStack.Redis;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Configuration;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace RedisClientPubSub
{

    //Based on: http://stackoverflow.com/questions/30818784/generic-object-cache
    public class RedisMemoryProvider<T> : IDisposable, RedisClientPubSub.IRedisMemoryProvider<T> 
    {

        private static readonly PooledRedisClientManager m = new PooledRedisClientManager(new string[] { 
            ConfigurationManager.AppSettings["RedisServer"] });

        readonly IDictionary<Type, List<object>> _cache = new ConcurrentDictionary<Type, List<object>>();

        private static readonly RedisPubSubManager _redisPubsub = new RedisPubSubManager();


        public delegate void RedisAlterationInfoHandler(RedisSyncInfoDataContract syncInfo);

        public event RedisAlterationInfoHandler OnRedisAlterationInfo;

        public RedisMemoryProvider()
        {
            LoadIntoCache<T>();
            _redisPubsub.OnRedisAlteration += OnRedisAlteration;
        }

        public void Dispose()
        {
            if (_redisPubsub != null)
                _redisPubsub.OnRedisAlteration -= OnRedisAlteration;
        }

        public void CloseSubscription()
        {
            if (_redisPubsub != null)
            {
                _redisPubsub.CloseSubscription(); 
            }
        }

        private void OnRedisAlteration(RedisSyncInfoDataContract syncInfo)
        {
            if (syncInfo == null)
                return;

            var item = JsonConvert.DeserializeObject(syncInfo.SerializedPayload, syncInfo.KeyType);

            bool changeMade = false;
 
            switch (syncInfo.Alteration)
            {
                case RedisAlteration.None:
                    break;
                case RedisAlteration.Added:
                    Create(item, onlyLocal: true);
                    changeMade = true; 
                    break;
                case RedisAlteration.Deleted:
                    Delete(item, onlyLocal: true);
                    changeMade = true; 
                    break;
                case RedisAlteration.Invalidated:
                    Update(x => true, item, onlyLocal: true);
                    changeMade = true; 
                    break;
                default:
                    break;
            }

            if (changeMade)
            {
                if (OnRedisAlterationInfo != null)
                    OnRedisAlterationInfo(syncInfo); 
            }
            
        }

        /// <summary>
        /// Load {T} into object cache from Data Store.
        /// </summary>
        /// <typeparam name="T">class</typeparam>
        private void LoadIntoCache<T>()
        {
            _cache[typeof(T)] = GetAll<T>().Cast<object>().ToList();
        }

        /// <summary>
        /// Find Single {T} in object cache.
        /// </summary>
        /// <typeparam name="T">class</typeparam>
        /// <param name="predicate">linq statement</param>
        /// <returns></returns>
        public T Read(Func<T, bool> predicate)
        {
            List<object> list;
            if (_cache.TryGetValue(typeof(T), out list))
            {
                return list.Cast<T>().Where(predicate).FirstOrDefault();
            }
            return default(T);
        }

        /// <summary>
        /// Find List<T>(predicate) in cache.
        /// </summary>
        /// <typeparam name="T">class</typeparam>
        /// <param name="predicate">linq statement</param>
        /// <returns></returns>
        public List<T> FindBy<T>(Func<T, bool> predicate) where T : class
        {
            List<object> list;
            if (_cache.TryGetValue(typeof(T), out list))
            {
                return list.Cast<T>().Where(predicate).ToList();
            }
            return new List<T>();
        }

        public T FindById<T>(long id)
        {
            using (var ctx = m.GetClient())
            {
                T foundItem = ctx.GetById<T>(id);
                return foundItem;
            }
        }

        public IList<T> FindByIds<T>(long[] ids)
        {
            using (var ctx = m.GetClient())
            {
                IList<T> foundItems = ctx.GetByIds<T>(ids);
                return foundItems;
            }
        }

        public void Create<T>(T entity, bool onlyLocal = false) where T : class
        {
            List<object> list;
            if (!_cache.TryGetValue(typeof(T), out list))
            {
                list = new List<object>();
            }
            list.Add(entity);
            _cache[typeof(T)] = list;
            if (!onlyLocal)
                Store<T>(entity, RedisAlteration.Added);
        }

        public void InsertRange<T>(IList<T> entitites, bool onlyLocal = false) where T : class 
        {
            List<object> list;
            if (!_cache.TryGetValue(typeof(T), out list))
            {
                list = new List<object>();
            }
            list.AddRange(entitites);
            _cache[typeof(T)] = list;
            if (!onlyLocal)
            {
                foreach (var entity in entitites)
                {
                    Store<T>(entity, RedisAlteration.Added);
                }
            }
        }

        /// <summary>
        /// Delete single {T} from cache and Data Store.
        /// </summary>
        /// <typeparam name="T">class</typeparam>
        /// <param name="entity">class object</param>
        public void Delete<T>(T entity, bool onlyLocal = false) where T : class
        {
            List<object> list;
            if (_cache.TryGetValue(typeof(T), out list))
            {
                list.Remove(entity);
                _cache[typeof(T)] = list;

                RedisDelete<T>(entity, RedisAlteration.Deleted);
            }
        }      

        public void ClearAll<T>(bool onlyLocal = false) where T : class
        {
            List<object> list;
            if (_cache.TryGetValue(typeof(T), out list))
            {
                foreach (T entity in list)
                {
                    RedisDelete<T>(entity, RedisAlteration.Deleted);
                }
                list.Clear(); 
                _cache[typeof(T)] = list;
            }
        }

        public long Next<T>() where T : class
        {
            long id = 1;

            using (var ctx = m.GetClient())
            {
                try
                {
                    id = ctx.As<T>().GetNextSequence();
                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.Message);
                }
            }
            return id;
        }

        public IList<T> GetAll<T>() 
        {
            using (var ctx = m.GetClient())
            {
                try
                {
                    return ctx.As<T>().GetAll();
                }
                catch (Exception err)
                {
                    Debug.WriteLine(err.Message);
                    return new List<T>();
                }
            }
        }

        public void Update<T>(Func<T, bool> predicate, T entity, bool onlyLocal = false) where T : class
        {
            List<object> list;

            if (_cache.TryGetValue(typeof(T), out list))
            {
                var existing = list.Cast<T>().FirstOrDefault(predicate);
                if (existing != null)
                    list.Remove(existing);
                list.Add(entity);
                _cache[typeof(T)] = list;
                if (!onlyLocal)
                    Store<T>(entity, RedisAlteration.Invalidated);
            }
        }

        public bool ExpireAt(string keyName, int expireInSeconds)
        {
            using (var client = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"]))
            {
                return client.Expire(keyName, expireInSeconds);
            }
        }

        public long GetTtl(string keyName)
        {
            using (var client = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"]))
            {
                return client.Ttl(keyName);
            }
        }

        public void Set(string keyName, string content)
        {
            using (var client = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"]))
            {
                client.Set(keyName, Encoding.UTF8.GetBytes(content));
            }
        }

        public string Get(string keyName)
        {
            using (var client = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"]))
            {
                return Encoding.UTF8.GetString(client.Get(keyName));
            }
        }

        public IDictionary<string, string> GetInfo()
        {
            using (var client = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"]))
            {
                return client.Info;
            }
        }

        public bool Ping()
        {
            using (var client = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"]))
            {
                return client.Ping();
            }
        }

        #region Private methods

        private void Store<T>(T entity, RedisAlteration alteration) where T : class
        {
            using (var ctx = m.GetClient())
            {
                ctx.Store<T>(entity);
                PublishChange<T>(entity, alteration);
            }
        }

        private static void PublishChange<T>(T entity, RedisAlteration alteration) where T : class
        {
            _redisPubsub.Publish(new RedisSyncInfoDataContract
            {
                KeyType = typeof(T),
                Alteration = alteration,
                SerializedPayload = JsonConvert.SerializeObject(entity)
            });
        }

        private void RedisDelete<T>(T entity, RedisAlteration alteration) where T : class
        {
            using (var ctx = m.GetClient())
            {
                ctx.As<T>().Delete(entity);
                PublishChange<T>(entity, alteration);
            }
        }

        private T Find<T>(long id) where T : class
        {
            using (var ctx = m.GetClient())
            {
                return ctx.As<T>().GetById(id);
            }
        }

        #endregion


    }
}


Note that the class above also contains logic for publishing changes to Redis for the different types of cache invalidations that can happen, such as insertions, updates and deletions. The code uses the Nuget package ServiceStack.Redis for communicating against Redis. You will need an appSetting where the FQDN of your Redis Server is inserted. We need some more code to do the Subscriptions and publishing routines against Redis:

using Newtonsoft.Json;
using ServiceStack.Redis;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;

namespace RedisClientPubSub
{
    
    public class RedisPubSubManager : IDisposable, RedisClientPubSub.IRedisPubSubManager
    {

        public RedisPubSubManager()
        {
            CreateSubscription();
            _subscriptionId = Guid.NewGuid().ToString(); 
        }

        public string SubscriptionId
        {
            get
            {
                return _subscriptionId; 
            }
        }


        public void Dispose()
        {
            CloseSubscription();
        }

        public void CloseSubscription()
        {
            if (_subscription != null)
            {
                try
                {
                    _subscription.UnSubscribeFromAllChannels();
                }
                catch (Exception err)
                {
                    Debug.WriteLine(err.Message);
                }
            }
        }

        public delegate void RedisAlterationHandler(RedisSyncInfoDataContract syncInfo); 

        public event RedisAlterationHandler OnRedisAlteration;

        private IRedisSubscription _subscription;
       
        private string _subscriptionId;

        private void CreateSubscription()
        {

            Task.Factory.StartNew(() =>
             {
                 using (var redisClient = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"]))
                 {
                     var subscription = redisClient.CreateSubscription();
                     subscription.OnMessage = (c, m) =>
                     {                         
                         if (m == RedisConstants.Stop)
                             subscription.UnSubscribeFromAllChannels();
                         SubscribeEvent(c, m);
                     };
                     subscription.SubscribeToChannels(new[] { RedisConstants.RedisCommonChannel });
                     _subscription = subscription;
                 } //using 
             });

        }

        public void Publish(RedisSyncInfoDataContract redisItem)
        {
            using (var redisClient = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"]))
            {
                redisItem.PublisherId = _subscriptionId;
                var jsonSerialized = JsonConvert.SerializeObject(redisItem);
                redisClient.Publish(RedisConstants.RedisCommonChannel, Encoding.UTF8.GetBytes(jsonSerialized));
            } //using 
        }

        private void SubscribeEvent(string channel, string message)
        {
            try
            {
                var syncInfo = JsonConvert.DeserializeObject<RedisSyncInfoDataContract>(message);
                if (syncInfo != null)
                {
                    if (syncInfo.PublisherId == _subscriptionId)
                        return; //published from the same node 

                    if (OnRedisAlteration != null)
                        OnRedisAlteration(syncInfo);
                    
                }
            }
            catch (Exception err)
            {
                Debug.WriteLine(err.Message);
            }
        }  

    }

}


Changes across clients are published and the data that was changed is serialized using Nuget package Newtonsoft.Json. The serialized payload is then deserialized on the receiving side. Note that only clients that differ in their PublisherId will receive the updates. After all, we design with one common channel that the clients or readers subscribe on, therefore we want to avoid doing any actions with the client or Reader that performed the update. The alterations and cache invalidations that can happen is defined in an enum:

using System;
using System.Runtime.Serialization;

namespace RedisClientPubSub
{
    
    [Flags]
    [DataContract]
    public enum RedisAlteration
    {

        [EnumMember]
        None = 0,

        [EnumMember]
        Added = 1,

        [EnumMember]
        Deleted = 2,

        [EnumMember]
        Invalidated = 3        

    }

}

I have created a simple unit test library for testing out the Redis provider shown above. Example unit test:

using System;
using NUnit.Framework;
using System.Collections.Generic;
using System.Threading;

namespace RedisClientPubSub.Test
{

    public class Vehicle
    {

        public string Model { get; set; }

        public string Make { get; set; }

        public string Color { get; set; }

        public int Id { get; set; }

    }
  
    [TestFixture]
    public class RedisMemoryProviderTest
    {

        private RedisMemoryProvider<Vehicle> _redisMemoryProvider;

        [TestFixtureSetUp]
        public void TestFixtureSetup()
        {
            _redisMemoryProvider = new RedisMemoryProvider<Vehicle>();
        }

        [Test]
        public void InsertRangeAndClearDoesNotThrow()
        {
            var car = new Vehicle { Id = 1, Model = "Audi", Make = "A4", Color = "Black" };
            var anotherCar = new Vehicle { Id = 2, Model = "BMW", Make = "M5", Color = "Blue" };
            var yetAnotherCar = new Vehicle { Id = 3, Model = "Ferrari", Make = "Ischigiera", Color = "Yellow" };

            _redisMemoryProvider.ClearAll<Vehicle>(); 
            _redisMemoryProvider.InsertRange(new List<Vehicle> { car, anotherCar, yetAnotherCar });
            Thread.Sleep(2000);
            var vehicles = _redisMemoryProvider.GetAll<Vehicle>();
            CollectionAssert.IsNotEmpty(vehicles); 
            Assert.AreEqual(3, _redisMemoryProvider.GetAll<Vehicle>().Count); 
            _redisMemoryProvider.ClearAll<Vehicle>();
            vehicles = _redisMemoryProvider.GetAll<Vehicle>();
            Assert.AreEqual(0, vehicles.Count); 
        }

    }
}


In addition, I have created a simple Windows Forms Client that one can launch multiple instances of to test out how the cache invalidations and pub-sub actions keep the multiple clients in sync.


Download Visual Studio Solution (2013) of the sample code above
Download VS solution [ZIP 6,0 MB]
Before running the sample, you will need to change the appSetting RedisServer and point it to a Redis server instance. You can just start a Redis server on your local machine for example.
Download Redis from here:
Redis.io website

3 comments:

  1. Thank you very much for this.... It seems to be just what i needed.

    How would you instantiate the RedisMemoryProvider with a generic type? Since i don't want to have to create a new RedisMemoryProvider for each type in my app (which would be a lot).

    ReplyDelete
  2. Thank you very much for this.... It seems to be just what i needed.

    How would you instantiate the RedisMemoryProvider with a generic type? Since i don't want to have to create a new RedisMemoryProvider for each type in my app (which would be a lot).

    ReplyDelete
  3. If you are looking into generating cash from your websites or blogs with popunder ads, you should try one of the biggest companies - Clickadu.

    ReplyDelete