- The code should use C# and Redis.
- The C# Redis provider used is ServiceStack.Redis
- The cache invalidation across clients should inform multiple subscribers when a change has been made and cache is invalidated
- A local in-memory cache should be kept for performance reasons
- The wrapper/provider should support a full CRUD scenario and support convenient methods such as InsertRange() and ClearAll().
- It should be easy to point the code to another Redis server.
- Multiple Readers scenario should be supported using Redis PubSub functionality
using Newtonsoft.Json; using ServiceStack.Redis; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Configuration; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; namespace RedisClientPubSub { //Based on: http://stackoverflow.com/questions/30818784/generic-object-cache public class RedisMemoryProvider<T> : IDisposable, RedisClientPubSub.IRedisMemoryProvider<T> { private static readonly PooledRedisClientManager m = new PooledRedisClientManager(new string[] { ConfigurationManager.AppSettings["RedisServer"] }); readonly IDictionary<Type, List<object>> _cache = new ConcurrentDictionary<Type, List<object>>(); private static readonly RedisPubSubManager _redisPubsub = new RedisPubSubManager(); public delegate void RedisAlterationInfoHandler(RedisSyncInfoDataContract syncInfo); public event RedisAlterationInfoHandler OnRedisAlterationInfo; public RedisMemoryProvider() { LoadIntoCache<T>(); _redisPubsub.OnRedisAlteration += OnRedisAlteration; } public void Dispose() { if (_redisPubsub != null) _redisPubsub.OnRedisAlteration -= OnRedisAlteration; } public void CloseSubscription() { if (_redisPubsub != null) { _redisPubsub.CloseSubscription(); } } private void OnRedisAlteration(RedisSyncInfoDataContract syncInfo) { if (syncInfo == null) return; var item = JsonConvert.DeserializeObject(syncInfo.SerializedPayload, syncInfo.KeyType); bool changeMade = false; switch (syncInfo.Alteration) { case RedisAlteration.None: break; case RedisAlteration.Added: Create(item, onlyLocal: true); changeMade = true; break; case RedisAlteration.Deleted: Delete(item, onlyLocal: true); changeMade = true; break; case RedisAlteration.Invalidated: Update(x => true, item, onlyLocal: true); changeMade = true; break; default: break; } if (changeMade) { if (OnRedisAlterationInfo != null) OnRedisAlterationInfo(syncInfo); } } /// <summary> /// Load {T} into object cache from Data Store. /// </summary> /// <typeparam name="T">class</typeparam> private void LoadIntoCache<T>() { _cache[typeof(T)] = GetAll<T>().Cast<object>().ToList(); } /// <summary> /// Find Single {T} in object cache. /// </summary> /// <typeparam name="T">class</typeparam> /// <param name="predicate">linq statement</param> /// <returns></returns> public T Read(Func<T, bool> predicate) { List<object> list; if (_cache.TryGetValue(typeof(T), out list)) { return list.Cast<T>().Where(predicate).FirstOrDefault(); } return default(T); } /// <summary> /// Find List<T>(predicate) in cache. /// </summary> /// <typeparam name="T">class</typeparam> /// <param name="predicate">linq statement</param> /// <returns></returns> public List<T> FindBy<T>(Func<T, bool> predicate) where T : class { List<object> list; if (_cache.TryGetValue(typeof(T), out list)) { return list.Cast<T>().Where(predicate).ToList(); } return new List<T>(); } public T FindById<T>(long id) { using (var ctx = m.GetClient()) { T foundItem = ctx.GetById<T>(id); return foundItem; } } public IList<T> FindByIds<T>(long[] ids) { using (var ctx = m.GetClient()) { IList<T> foundItems = ctx.GetByIds<T>(ids); return foundItems; } } public void Create<T>(T entity, bool onlyLocal = false) where T : class { List<object> list; if (!_cache.TryGetValue(typeof(T), out list)) { list = new List<object>(); } list.Add(entity); _cache[typeof(T)] = list; if (!onlyLocal) Store<T>(entity, RedisAlteration.Added); } public void InsertRange<T>(IList<T> entitites, bool onlyLocal = false) where T : class { List<object> list; if (!_cache.TryGetValue(typeof(T), out list)) { list = new List<object>(); } list.AddRange(entitites); _cache[typeof(T)] = list; if (!onlyLocal) { foreach (var entity in entitites) { Store<T>(entity, RedisAlteration.Added); } } } /// <summary> /// Delete single {T} from cache and Data Store. /// </summary> /// <typeparam name="T">class</typeparam> /// <param name="entity">class object</param> public void Delete<T>(T entity, bool onlyLocal = false) where T : class { List<object> list; if (_cache.TryGetValue(typeof(T), out list)) { list.Remove(entity); _cache[typeof(T)] = list; RedisDelete<T>(entity, RedisAlteration.Deleted); } } public void ClearAll<T>(bool onlyLocal = false) where T : class { List<object> list; if (_cache.TryGetValue(typeof(T), out list)) { foreach (T entity in list) { RedisDelete<T>(entity, RedisAlteration.Deleted); } list.Clear(); _cache[typeof(T)] = list; } } public long Next<T>() where T : class { long id = 1; using (var ctx = m.GetClient()) { try { id = ctx.As<T>().GetNextSequence(); } catch (Exception ex) { Debug.WriteLine(ex.Message); } } return id; } public IList<T> GetAll<T>() { using (var ctx = m.GetClient()) { try { return ctx.As<T>().GetAll(); } catch (Exception err) { Debug.WriteLine(err.Message); return new List<T>(); } } } public void Update<T>(Func<T, bool> predicate, T entity, bool onlyLocal = false) where T : class { List<object> list; if (_cache.TryGetValue(typeof(T), out list)) { var existing = list.Cast<T>().FirstOrDefault(predicate); if (existing != null) list.Remove(existing); list.Add(entity); _cache[typeof(T)] = list; if (!onlyLocal) Store<T>(entity, RedisAlteration.Invalidated); } } public bool ExpireAt(string keyName, int expireInSeconds) { using (var client = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"])) { return client.Expire(keyName, expireInSeconds); } } public long GetTtl(string keyName) { using (var client = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"])) { return client.Ttl(keyName); } } public void Set(string keyName, string content) { using (var client = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"])) { client.Set(keyName, Encoding.UTF8.GetBytes(content)); } } public string Get(string keyName) { using (var client = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"])) { return Encoding.UTF8.GetString(client.Get(keyName)); } } public IDictionary<string, string> GetInfo() { using (var client = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"])) { return client.Info; } } public bool Ping() { using (var client = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"])) { return client.Ping(); } } #region Private methods private void Store<T>(T entity, RedisAlteration alteration) where T : class { using (var ctx = m.GetClient()) { ctx.Store<T>(entity); PublishChange<T>(entity, alteration); } } private static void PublishChange<T>(T entity, RedisAlteration alteration) where T : class { _redisPubsub.Publish(new RedisSyncInfoDataContract { KeyType = typeof(T), Alteration = alteration, SerializedPayload = JsonConvert.SerializeObject(entity) }); } private void RedisDelete<T>(T entity, RedisAlteration alteration) where T : class { using (var ctx = m.GetClient()) { ctx.As<T>().Delete(entity); PublishChange<T>(entity, alteration); } } private T Find<T>(long id) where T : class { using (var ctx = m.GetClient()) { return ctx.As<T>().GetById(id); } } #endregion } }Note that the class above also contains logic for publishing changes to Redis for the different types of cache invalidations that can happen, such as insertions, updates and deletions. The code uses the Nuget package ServiceStack.Redis for communicating against Redis. You will need an appSetting where the FQDN of your Redis Server is inserted. We need some more code to do the Subscriptions and publishing routines against Redis:
using Newtonsoft.Json; using ServiceStack.Redis; using System; using System.Collections.Generic; using System.Configuration; using System.Diagnostics; using System.Linq; using System.Runtime.Serialization; using System.Text; using System.Threading.Tasks; namespace RedisClientPubSub { public class RedisPubSubManager : IDisposable, RedisClientPubSub.IRedisPubSubManager { public RedisPubSubManager() { CreateSubscription(); _subscriptionId = Guid.NewGuid().ToString(); } public string SubscriptionId { get { return _subscriptionId; } } public void Dispose() { CloseSubscription(); } public void CloseSubscription() { if (_subscription != null) { try { _subscription.UnSubscribeFromAllChannels(); } catch (Exception err) { Debug.WriteLine(err.Message); } } } public delegate void RedisAlterationHandler(RedisSyncInfoDataContract syncInfo); public event RedisAlterationHandler OnRedisAlteration; private IRedisSubscription _subscription; private string _subscriptionId; private void CreateSubscription() { Task.Factory.StartNew(() => { using (var redisClient = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"])) { var subscription = redisClient.CreateSubscription(); subscription.OnMessage = (c, m) => { if (m == RedisConstants.Stop) subscription.UnSubscribeFromAllChannels(); SubscribeEvent(c, m); }; subscription.SubscribeToChannels(new[] { RedisConstants.RedisCommonChannel }); _subscription = subscription; } //using }); } public void Publish(RedisSyncInfoDataContract redisItem) { using (var redisClient = new RedisNativeClient(ConfigurationManager.AppSettings["RedisServer"])) { redisItem.PublisherId = _subscriptionId; var jsonSerialized = JsonConvert.SerializeObject(redisItem); redisClient.Publish(RedisConstants.RedisCommonChannel, Encoding.UTF8.GetBytes(jsonSerialized)); } //using } private void SubscribeEvent(string channel, string message) { try { var syncInfo = JsonConvert.DeserializeObject<RedisSyncInfoDataContract>(message); if (syncInfo != null) { if (syncInfo.PublisherId == _subscriptionId) return; //published from the same node if (OnRedisAlteration != null) OnRedisAlteration(syncInfo); } } catch (Exception err) { Debug.WriteLine(err.Message); } } } }Changes across clients are published and the data that was changed is serialized using Nuget package Newtonsoft.Json. The serialized payload is then deserialized on the receiving side. Note that only clients that differ in their PublisherId will receive the updates. After all, we design with one common channel that the clients or readers subscribe on, therefore we want to avoid doing any actions with the client or Reader that performed the update. The alterations and cache invalidations that can happen is defined in an enum:
using System; using System.Runtime.Serialization; namespace RedisClientPubSub { [Flags] [DataContract] public enum RedisAlteration { [EnumMember] None = 0, [EnumMember] Added = 1, [EnumMember] Deleted = 2, [EnumMember] Invalidated = 3 } }I have created a simple unit test library for testing out the Redis provider shown above. Example unit test:
using System; using NUnit.Framework; using System.Collections.Generic; using System.Threading; namespace RedisClientPubSub.Test { public class Vehicle { public string Model { get; set; } public string Make { get; set; } public string Color { get; set; } public int Id { get; set; } } [TestFixture] public class RedisMemoryProviderTest { private RedisMemoryProvider<Vehicle> _redisMemoryProvider; [TestFixtureSetUp] public void TestFixtureSetup() { _redisMemoryProvider = new RedisMemoryProvider<Vehicle>(); } [Test] public void InsertRangeAndClearDoesNotThrow() { var car = new Vehicle { Id = 1, Model = "Audi", Make = "A4", Color = "Black" }; var anotherCar = new Vehicle { Id = 2, Model = "BMW", Make = "M5", Color = "Blue" }; var yetAnotherCar = new Vehicle { Id = 3, Model = "Ferrari", Make = "Ischigiera", Color = "Yellow" }; _redisMemoryProvider.ClearAll<Vehicle>(); _redisMemoryProvider.InsertRange(new List<Vehicle> { car, anotherCar, yetAnotherCar }); Thread.Sleep(2000); var vehicles = _redisMemoryProvider.GetAll<Vehicle>(); CollectionAssert.IsNotEmpty(vehicles); Assert.AreEqual(3, _redisMemoryProvider.GetAll<Vehicle>().Count); _redisMemoryProvider.ClearAll<Vehicle>(); vehicles = _redisMemoryProvider.GetAll<Vehicle>(); Assert.AreEqual(0, vehicles.Count); } } }In addition, I have created a simple Windows Forms Client that one can launch multiple instances of to test out how the cache invalidations and pub-sub actions keep the multiple clients in sync.
Download Visual Studio Solution (2013) of the sample code above
Download VS solution [ZIP 6,0 MB]
Before running the sample, you will need to change the appSetting RedisServer and point it to a Redis server instance. You can just start a Redis server on your local machine for example.
Download Redis from here:
Redis.io website
Thank you very much for this.... It seems to be just what i needed.
ReplyDeleteHow would you instantiate the RedisMemoryProvider with a generic type? Since i don't want to have to create a new RedisMemoryProvider for each type in my app (which would be a lot).
Thank you very much for this.... It seems to be just what i needed.
ReplyDeleteHow would you instantiate the RedisMemoryProvider with a generic type? Since i don't want to have to create a new RedisMemoryProvider for each type in my app (which would be a lot).
If you are looking into generating cash from your websites or blogs with popunder ads, you should try one of the biggest companies - Clickadu.
ReplyDeleteThe data nowadays is where a digital rebellion has taken place the whole industry especially transportation and logistics. You want to know IT recruitment Ukraine the key areas to focus on during negotiations with transfer vendors. This may provide the grant to have a substantial discount on the standard contract. They will most likely get deals for small vehicle profiles. FedEx and UPS are looking for perfect transport profiles with heavy B2B ground loads on efficient travel routes.
ReplyDeleteSo wonderful to discover somebody with a few genuine thoughts on this subject. 일본야동
ReplyDeleteClick this link
한국야동
Nice post. I learn something totally new and challenging on websites 한국야동닷컴
ReplyDeleteClick this link
국산야동
There’s definitely a lot to know about this issue. I really like all the points you made. 국산야동
ReplyDeleteClick this link
야설
That’s why marketing and advertising that you simply applicable exploration previous to publishing. 중국야동넷
ReplyDeleteClick this link
야설
Today I want to share with you a cool outsource data entry services that will help you improve the quality of your services, programs and projects. This service is necessary in order to automate and systematize your processes. This is a very smart algorithm that was created by experienced developers.
ReplyDeleteThis comment has been removed by the author.
ReplyDelete