Pages

Wednesday, January 2, 2013

Ehcache refresh on DB change from external source

Problem statement:

  1. I have 2 web applications say WebApp-1 and WebApp-2, they access same database say DB-1 using Hibeernate 3.6. 
  2. WebApp-2 uses L2 cache i.e. Ehcache whereas WebApp-1 doesn't use any L2 caching.
  3. When WebApp-1 modifies DB state, L2 cache of WebApp-2 doesn't reflect that, as a result of that modifactions doen by WebApp-1 are not visible to WebApp-2.
Problem solution: 
I have found these three solutions from ehcache documentation.

Use one of the following strategies to keep the data in the cache in sync:
  • data expiration: use the eviction algorithms included with Ehcache along with the timeToIdleSeconds and timetoLiveSeconds setting to enforce a maximum time for elements to live in the cache (forcing a re-load from the database or SOR).
  • message bus: use an application to make all updates to the database. When updates are made, post a message onto a message queue with a key to the item that was updated. All application instances can subscribe to the message bus and receive messages about data that is updated, and can synchronize their local copy of the data accordingly (for example by invalidating the cache entry for updated data)
  • triggers: Using a database trigger can accomplish a similar task as the message bus approach. Use the database trigger to execute code that can publish a message to a message bus. The advantage to this approach is that updates to the database do not have to be made only through a special application. The downside is that not all database triggers support full execution environments and it is often unadvisable to execute heavy-weight processing such as publishing messages on a queue during a database trigger.
I thought of using the message bus or JMS kind of solution. But, I did't use that, for that matter what I did used is a broadcasting solution.

So, as per my problem, WebApp-1 broadcasts all the change events whereas WebApp-2 forks a thread which listens to any broadacast messages send by WebApp-1. Here is the code snippet for the broadcaster and receiver.

Broadcaster:

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;

import net.atomex.dashboard.controllers.AdvertiserController;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CacheEventBroadcaster extends Thread {

        private String entityName = null;
        private Integer entityId = null;

        private DatagramSocket socket = null;

        private Logger logger = LoggerFactory.getLogger(AdvertiserController.class);

        
        public CacheEventBroadcaster(String entityName, Integer entityId)
                        throws IOException {                
                this("CacheEventThread", entityName, entityId);
        }

        public CacheEventBroadcaster(String name, String entityName,
                        Integer entityId) throws IOException {
                super(name);
                this.entityId = entityId;
                this.entityName = entityName;
                logger.trace("Starting cache event broadcaster...");
        }

        public void run() {
                try {
                        boolean connected = false;
                        while (!connected) {
                                // Create the datagram socket
                                try{
                                socket = new DatagramSocket(4445);
                                }catch (Exception e) {
                                        //do nothing but retry
                                        logger.trace("Exception: " + e.getMessage());
                                }
                                connected = true;
                        }

                        // Create the broadcast message
                        byte[] buf = new byte[1024];
                        String dString = entityName + ":" + entityId;
                        buf = dString.getBytes();
                        logger.trace("Broadcast message created : [" + dString + "]");

                        // Create the broadcast group
                        InetAddress group = InetAddress.getByName("230.0.0.1");
                        DatagramPacket packet = new DatagramPacket(buf, buf.length, group,
                                        4446);
                        logger.trace("Sending message to : " + group.getHostAddress());

                        // Send the message to broadcast group
                        socket.send(packet);
                        logger.trace("Message sent successfully.");

                        // close the socket
                        socket.close();
                } catch (IOException e) {
                        logger.trace("An error occured while brodcasting the message. "
                                        + e.getMessage());
                        e.printStackTrace();
                }
        }

        public static void broadcastCacheEvent(String entityName, Integer entityId) {
                try {
                        new CacheEventBroadcaster(entityName, entityId).start();
                } catch (IOException e) {
                        e.printStackTrace();
                }
        }
}


Use this piece of code to send the broadcast message.

CacheEventBroadcaster.broadcastCacheEvent("EntityName", id);


Receiver: 

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.List;

import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;

import org.hibernate.Criteria;
import org.hibernate.criterion.Restrictions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.komli.prime.service.shared.cache.HibernateCacheProvider;
import com.komli.prime.service.shared.util.db.DatabaseFactory;

public class CacheEventsReceiver {
        static final Logger logger = LoggerFactory.getLogger(CacheEventsReceiver.class);
        public static void receiveCacheEvents() throws IOException {
                logger.info("Cache event listner deamon thread started...");
                new ReceiverThread().start();
        }
}

class ReceiverThread extends Thread {

        static final String QUERY_CACHE = "org.hibernate.cache.StandardQueryCache";
        static final Logger logger = LoggerFactory.getLogger(ReceiverThread.class);
        static CacheManager cacheManager = HibernateCacheProvider.getCacheManager();

        public void run() {

                MulticastSocket socket;
                try {
                        socket = new MulticastSocket(4446);
                        InetAddress address = InetAddress.getByName("230.0.0.1");
                        socket.joinGroup(address);
                        DatagramPacket packet;
                        String entityName;
                        Integer entityId;

                        while (true) {

                                byte[] buf = new byte[1024];
                                packet = new DatagramPacket(buf, buf.length);
                                socket.receive(packet);

                                String message = new String(packet.getData(), 0,
                                                packet.getLength());

                                if (message != null && !message.equals("")) {

                                        String inputs[] = message.split(":");
                                        entityName = inputs[0];
                                        entityId = new Integer(inputs[1]);

                                        logger.info("Received new cache event, message is "
                                                        + message);

                                        @SuppressWarnings("unchecked")
                                        Object entity = getById(
                                                        CacheObjectMap.getMappedClass(entityName), entityId);
                                        
                                        //Remove the query cache to get the latest changes,                                        
                                        Cache queryCache = cacheManager.getCache(QUERY_CACHE);
                                        queryCache.removeAll();
                                        logger.info("Query Cache Cleared...");                                        

                                        // Evict the entity from session
                                        DatabaseFactory
                                                        .createDatabase()
                                                        .getSessionFactory()
                                                        .evict(CacheObjectMap.getMappedClass(entityName),
                                                                        entityId);
                                }
                        }
                } catch (IOException e) {
                        logger.info("An error occured while brodcasting the message. "
                                                        + e.getMessage());
                        e.printStackTrace();
                }

        }

        public  T getById(Class c, long id) {
                DatabaseFactory.createDatabase().getSessionFactory()
                                .getCurrentSession().beginTransaction();
                Criteria criteria = DatabaseFactory.createDatabase()
                                .getSessionFactory().getCurrentSession().createCriteria(c);
                criteria.add(Restrictions.eq("id", (int) id));

                @SuppressWarnings("unchecked")
                List result = (List) criteria.list();
                if (result == null || result.size() == 0)
                        return null;
                return result.get(0);
        }
}


Put this piece somewhere to register/start above piece of code.

try {
        CacheEventsReceiver.receiveCacheEvents();
} catch (IOException e) {
        e.printStackTrace();
}


The CacheObjectMap is a string to class mapper. The sample code snippet is as below.

import java.util.HashMap;

public class CacheObjectMap {

        private static HashMap<String, Class> classMap;

        static {
                classMap = new HashMap<String, Class>();
                
                classMap.put("EntityName", com.komli.prime.service.shared.db.model.Entity.class);
                                
        }
        
        public static Class getMappedClass(String className){
                return classMap.get(className);
        }
}