Hello again, after more than 4 months :) So many things have changed in my life since my last blog post. I will try to talk about them from time to time, which I am not very sure about that actually :)
I have been playing with Redis and its Java client Jedis lately. It looks very promising. I’m making experimental things on it and I really liked it. I will probably use it one of my personal projects for some use cases.
Below code is an example for a publish / subscribe use case with Jedis. You need Jedis (my version is 2.0.0) and SLF4J jars in your classpath for running it.
In redis, you can subscribe to multiple channels and when someone publishes messages on those channels, redis notifies you with published messages. Jedis provides this functionality with JedisPubSub
abstract class. To handle pub / sub events, you need to extend JedisPubSub
class and implement the abstract methods.
package com.basrikahveci.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPubSub;
public class Subscriber extends JedisPubSub {
private static Logger logger = LoggerFactory.getLogger(Subscriber.class);
@Override
public void onMessage(String channel, String message) {
logger.info("Message received. Channel: {}, Msg: {}", channel, message);
}
@Override
public void onPMessage(String pattern, String channel, String message) {
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
}
}
Here is my dummy subscriber class. I will subscribe to some channels with this class and get notifications.
My ugly main method is below.
package com.basrikahveci.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class Program {
public static final String CHANNEL_NAME = "commonChannel";
private static Logger logger = LoggerFactory.getLogger(Program.class);
public static void main(String[] args) throws Exception {
final JedisPoolConfig poolConfig = new JedisPoolConfig();
final JedisPool jedisPool = new JedisPool(poolConfig, "localhost", 6379, 0);
final Jedis subscriberJedis = jedisPool.getResource();
final Subscriber subscriber = new Subscriber();
new Thread(new Runnable() {
@Override
public void run() {
try {
logger.info("Subscribing to \"commonChannel\". This thread will be blocked.");
subscriberJedis.subscribe(subscriber, CHANNEL_NAME);
logger.info("Subscription ended.");
} catch (Exception e) {
logger.error("Subscribing failed.", e);
}
}
}).start();
final Jedis publisherJedis = jedisPool.getResource();
new Publisher(publisherJedis, CHANNEL_NAME).start();
subscriber.unsubscribe();
jedisPool.returnResource(subscriberJedis);
jedisPool.returnResource(publisherJedis);
}
}
Let me explain it. I am using a JedisPool
to get Jedis instances. Jedis class is not thread-safe but JedisPool
is thread-safe by the way. I am using 2 Jedis instances, one for publishing messages and one for subscribing to channels. I made the subscription on another thread because it’s a blocking operation. I also have a very simple Publisher
class which reads from console and publishes them on the given channel. Here it is.
package com.basrikahveci.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class Publisher {
private static final Logger logger = LoggerFactory.getLogger(Publisher.class);
private final Jedis publisherJedis;
private final String channel;
public Publisher(Jedis publisherJedis, String channel) {
this.publisherJedis = publisherJedis;
this.channel = channel;
}
public void start() {
logger.info("Type your message (quit for terminate)");
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String line = reader.readLine();
if (!"quit".equals(line)) {
publisherJedis.publish(channel, line);
} else {
break;
}
}
} catch (IOException e) {
logger.error("IO failure while reading input, e");
}
}
}
I simply take user’s input from console and publish it on a channel via Jedis client. When user types quit
, I cancel the subscription and that triggers the blocked subscription thread to continue.
Key points here are:
PubSub
instance.JedisPool
if you will create many Jedis instances and give them back to the pool when you are done with them. By the way, the last arg (0) i passed to JedisPool
is for avoiding SocketTimeoutException
.You can get Jedis from here.