Saturday, 3 August 2013

41:Concurrency Utilities

Learning Objectives
After completing this session, you will be able to:
‰  List the Concurrency Utilities
Concurrency Utilities: JSR-166
‰  Concurrency utilities enable development ofsimple yet powerful multi-threaded
applications. Like Collection it provides rich data structure handling capability.
‰  It beats C performance in high-end server applications.
‰  It provides richer set of concurrency building blocks. wait() , notify() , and synchronized
methods are too primitive.
‰  It enhances scalability, performance, readability, and thread safety of Java
applications.
Why Use Concurrency Utilities?
Concurrency utilities are used because:
‰  Reduced programming effort
‰  Increased performance
‰  Increased reliability: Eliminate threading hazards such as deadlock, starvation, race
conditions, or excessive context switching are eliminated
‰  Improved maintainability
‰  Increased productivity
Concurrency Utilities
The Concurrency Utilities are:
‰  Task Scheduling Framework
‰  Callable's and Future's
‰  Synchronizers
‰  Concurrent Collections
‰  Atomic Variables
‰  Locks
‰  Nanosecond-granularity timing
Task Scheduling Framework
‰  Executor /ExecutorService/Executors framework supports:
oStandardizing invocation
oScheduling
oExecution
oControl of asynchronous tasks according to a set of execution policies
‰  Executor is an interface
‰  ExecutorService extends Executor
‰  Executors is factory class for creating various kinds of ExercutorService
implementations
Executor Interface
‰  Executor interface provides a way of de-coupling task submission from the execution:
‰  Execution: Mechanics of how each task will be run, including details of thread use and
scheduling
‰  Example:
o  Executor executor = getSomeKindofExecutor();
o  executor.execute(new RunnableTask1());
o  executor.execute(new RunnableTask2());
‰  Many Executor implementations impose some sort of limitation on how and when
tasks are scheduled.
Executor and ExecutorService
ExecutorService adds lifecycle management:
public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout,
TimeUnit unit);
// other convenience methods for submitting tasks
}
Creating ExecutorService From Executors
public class Executors {
static ExecutorService
newSingleThreadedExecutor();
static ExecutorService
newFixedThreadPool(int n);
static ExecutorService
newCachedThreadPool(int n);
static ScheduledExecutorService
newScheduledThreadPool(int n);
// additional versions specifying ThreadFactory
// additional utility methods
}
Code example of poor resource management (pre-J2SE 5.0 code)
class WebServer {
public static void main(String[] args) {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection =
socket.accept();
Runnable r = new Runnable() {
public void run() {
handleRequest(connection);
}
};
// Don't do this!
new Thread(r).start();
}
}
}
Code example for better resource management (Using Executors of Java2,v5.0)
class WebServer {
Executor pool =
Executors.newFixedThreadPool(7);
public static void main(String[] args) {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection =
socket.accept();
Runnable r = new Runnable() {
public void run() {
handleRequest(connection);
}
};
pool.execute(r);
}
}
}
Callable's and Future's: Problem (pre-J2SE 5.0)
If a new thread (callable thread) is started in an application, then there is currently no way to return
a result from that thread to the thread (calling thread) that started it without the use of a shared
variable and appropriate synchronization. This is complex and makes code harder to explain and
maintain.
Callables and Futures
‰  Callable thread (Callee) implements Callable interface: Implement call() method rather
than run()
‰  Calling thread (Caller) submits Callable object to Executor and then moves on through
submit() and not execute()
‰  The submit() returns a Future object
‰  Calling thread (Caller) retrieves the result using get() method of Future object:
‰  If result is ready, then it is returned
‰  If result is not ready, then calling thread will block
Build CallableExample (This is Callee)
class CallableExample
implements Callable<String> {
public String call() {
String result = “The work is ended”;
/* Do some work and create a result */
return result;
}
}
Future Example (Caller)
ExecutorService es =
Executors.newSingleThreadExecutor();
Future<String> f =
es.submit(new CallableExample());
/* Do some work in parallel */
try {
String callableResult = f.get();
} catch (InterruptedException ie) {
/* Handle */
} catch (ExecutionException ee) {
/* Handle */
}
Semaphores
‰  Semaphores are typically used to restrict access to fixed size pool of resources.
‰  New Semaphore object is created with same count as number of resources.
‰  Thread trying to access resource, calls the aquire() method:
oReturns immediately if semaphore count is greater than zero
oBlocks if count is zero until release() is called by different thread
oaquire() and release() are thread safe atomic operations
Semaphore Example
private Semaphore available;
private Resource[] resources;
private boolean[] used;
public Resource(int poolSize) {
available = new Semaphore(poolSize);
/* Initialise resource pool */
}
public Resource getResource() {
try { available.aquire() } catch (IE) {}
/* Acquire resource */
}
public void returnResource(Resource r) {
/* Return resource to pool */
available.release();
}
BlockingQueue Interface
‰  BlockingQueue interface provides thread safeway for multiple threads to manipulate
collection.
‰  ArrayBlockingQueue is the simplest concrete implementation.
‰  Full set of methods are:
oput()
ooffer() [non-blocking]
opeek()
otake()
opoll() [non-blocking and fixed time blocking]
Blocking Queue Example 1
private BlockingQueue<String> msgQueue;
public Logger(BlockingQueue<String> mq) {
msgQueue = mq;
}
public void run() {
try {
while (true) {
String message = msgQueue.take();
/* Log message */
}
} catch (InterruptedException ie) {
/* Handle */
}
}
Blocking Queue Example 2
private ArrayBlockingQueue messageQueue =
new ArrayBlockingQueue<String>(10);
Logger logger = new Logger(messageQueue);
public void run() {
String someMessage;
try {
while (true) {
/* Do some processing */
/* Blocks if no space available */
messageQueue.put(someMessage);
}
} catch (InterruptedException ie) { }
}
Atomics
java.util.concurrent.atomic is a small toolkit of classes that support lock-free threadsafe
programming on single variables.
AtomicInteger balance = new AtomicInteger(0);
public int deposit(integer amount) {
return balance.addAndGet(amount);
}
Locks
Lock interface:
‰  More extensive locking operations than synchronized block
‰  No automatic unlocking, use try and finally to unlock
‰  Non-blocking access using tryLock()
ReentrantLock:
‰  Concrete implementation of Lock
‰  Holding thread can call lock() multiple times and not block
‰  Useful for recursive code
ReadWriteLock
ReadWriteLock has two locks controlling read and write access:
‰  Multiple threads can acquire the read lock if no threads have a write lock.
‰  If a thread has a read lock, then others can acquire read lock but nobody can acquire
write lock.
‰  If a thread has a write lock, then nobodycan have read or write lock.
‰  Methods to access locks are rwl.readLock().lock(); and rwl.writeLock().lock();
ReadWrite Lock Example
class ReadWriteMap {
final Map<String, Data> m = new TreeMap<String, Data>();
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();
public Data get(String key) {
r.lock();
try { return m.get(key) }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}
Try It Out
Problem Statement:
Write a program that illustrates the usage of wait() and notify() methods in controlling access to a
shared resource.
Code:
class ThreadUsingWaitAndNotifyApp {
public static void main(String args[]) {
Resource resource = new Resource();
Thread controller = new Thread(new Controller(resource));
Thread[] user = new Thread[3];
for (int i = 0; i < user.length; ++i)
user[i] = new Thread(new User(i, resource));
controller.start();
for (int i = 0; i < user.length; ++i)
user[i].start();
boolean alive;
out: do {
alive = false;
Summary
‰  When a thread goes to sleep, its locks will be unavailable to other threads.
‰  All three methods namely wait(), notify(), and notifyAll() must be called from within a
synchronized context! A thread invokes wait()or notify() on a particular object, and the
thread must currently hold the lock on that object.
‰  Basically, a wait() call means “wait me in your pool,”, or “add me to your waiting list.”
‰  The notify() method is used to signal toone and only one of the threads that are
waiting in that same object’s waiting pool.
‰  The notify() method can NOT specify which waiting thread to notify.
Test Your Understanding
1.Name some of the concurrency utilities.
2.Why should you use concurrency utilities?

No comments:

Post a Comment