To run an object in a thread:
-
Implement
Runnable
'srun()
andstart()
in a newThread
(preferred) -
Extend
Thread
, overriderun()
, instantiate, andstart()
-
Implement
-
Threads can:
join
,interrupt
,sleep
,yield
, be prioritized, stack-dumped, be enumerated, grouped, etc.
public class ThreadDemo { public static void main(String[] args) throws InterruptedException { int numThreads = Integer.parseInt(args[0]); int count = Integer.parseInt(args[1]); Thread[] threads = new Thread[numThreads]; System.out.println("Creating threads"); for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(new Runner("Runner " + i, count)); } System.out.println("Starting threads"); for (int i = 0; i < threads.length; i++) { threads[i].start(); } System.out.println("Waiting for threads"); for (int i = 0; i < threads.length; i++) { threads[i].join(); } System.out.println("Done"); } } public class Runner implements Runnable { private final String name; private final int count; public Runner(String name, int count) { this.name = name; this.count = count; } public void run() { for (int i = 0; i < count; i++) { System.out.println(name + "=" + i); Thread.yield(); } } }
Use
synchronized
keyword to protect critical sections from concurrent access- Do not allow data/structures to be modified by multiple threads simultaneously
- Can be used on methods or blocks of code
Synchronization occurs on an object that acts as a monitor for the running threads
- Usually, this is the object that is to be protected
-
Can be any other object (even generic instances of
Object
):Object lock = new Object();
public class Counter { private int count = 0; public void setCount(int count) { this.count = count; } public int getCount() { return this.count; } } public class Runner implements Runnable { private final int maxCount; private final Counter counter; public Runner(int maxCount, Counter counter) { this.maxCount = maxCount; this.counter = counter; } public void run() { for (int i = 0; i < this.maxCount; i++) { int currentCount; synchronized(this.counter) { currentCount = this.counter.getCount(); currentCount++; this.counter.setCount(currentCount); } System.out.println(Thread.currentThread().getName() + " at count " + currentCount); } } }
A better approach would be to add a synchronized count method on the Counter class so that Runners would not have to worry about synchronization.
public synchronized int count() { return ++this.count; }
Use
Object.wait()
,Object.notify()
, andObject.notifyAll()
to schedule threads (wait on each other)- Solves the classic Consumer-Producer problem
-
Methods
wait()
,notify()
, andnotifyAll()
must be called insynchronized
blocks -
Always do checks around
wait()
usingwhile
orfor
loops rather than simpleif
conditions -
Better yet, use Java 5’s
java.util.concurrent
advanced thread-safe data structures and controls
public class Queue { private static class Element { final Object value; Element next; Element(Object value) {this.value = value;} } private Element first, last; private int curSize, maxSize; public Queue(int maxSize) { this.maxSize = maxSize; } public synchronized void put(Object o) throws InterruptedException { while (this.curSize == this.maxSize) { this.wait(); } if (this.first == null) { this.first = (this.last = new Element(o)); } else { this.last = (this.last.next = new Element(o)); } this.curSize++; this.notifyAll(); } public synchronized Object get() throws InterruptedException { while (this.curSize == 0) { this.wait(); } Object o = this.first.value; this.first = this.first.next; this.curSize--; this.notifyAll(); return o; } }
The Java 5’s
java.util.concurrent
package provides a powerful, extensible framework of high-performance, scalable, thread-safe building blocks for developing concurrent classes and applications- Thread pools and custom execution frameworks
- Queues and related thread-safe collections
- Atomic variables
- Special-purpose locks, barriers, semaphores and condition variables
-
Better than
synchronized
,wait()
,notify()
, …
- Its packages free the programmer from having to write these utilities by hand, like Collections did for data structures
- The packages also provide low-level primitives for advanced concurrent programming
Built-in concurrency primitives :
wait()
,notify()
,synchronized
are too hard:- Hard to use
- Easy to make errors
- Low level for most applications
- Poor performance if used incorrectly
- Too much wheel-reinventing (not standardized)
Concurrency APIs in Java 5
-
java.util.concurrent
- Utility classes and frameworks commonly used in concurrent programming - e.g. thread pools -
java.util.concurrent.atomic
- Classes that support lock-free and thread-safe operations on single variables - e.g. atomici++
-
java.util.concurrent.locks
- Framework for locking and waiting for conditions that is distinct from built-in synchronization and monitors
-
public class NetworkedService { public void service(int port) throws Exception { ServerSocket ss = new ServerSocket(port); while(true) { new Thread(new Handler(ss.accept())).start(); //} } private static class Handler implements Runnable { private final Socket s; public Handler(Socket s) { this.s = s; } public void run() { //
} } }
![]() | Note |
---|---|
With no access to a standard thread-pool library, we end up creating Threads on demand. Works well for very simple cases, but does not scale and it is not resource-efficient. |
public class NetworkedService { public void service(int port) throws Exception { ServerSocket ss = new ServerSocket(port); ExecutorService pool = Executors.newCachedThreadPool(); //while(true) { pool.execute(new Handler(ss.accept())); //
} } private static class Handler implements Runnable { //
} }
Pick a pool scheduling strategy that fits our needs | |
Let the executor service manage the threads as needed | |
Same implementation as before |
- Java 5’s Scheduling framework standardizes invocation, scheduling, execution, and control of asynchronous tasks
- We can use its thread-pool executor service to provide better resource utilization
Executor interface decouples task submission from the mechanics of how it will be run:
public interface Executor { public void execute(Runnable command); }
Very easy to build different execution strategies:
public class InThreadExecutor implements Executor { public void execute(Runnable r) { r.run(); } } public class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
-
ExecutorService
builds uponExecutor
to provide lifecycle management and support for tracking progress of asynchronous tasks -
Constructed through through factory methods on
Executors
class ScheduledExecutorService
provides a framework for executing deferred and recurring tasks that can be cancelled-
Like
Timer
but supports pooling
-
Like
public interface ExecutorService extends Executor { /** Stop accepting new tasks */ public void shutdown(); /** Stop accepting new tasks, cancel tasks currently waiting, and interrupt active tasks */ public List<Runnable> shutdownNow(); public boolean isShutdown(); public boolean isTerminated(); public boolean awaitTermination(long timeout, TimeUnit unit); public void execute(Runnable c); public <T> Future<T> submit(Callable<T> task); ... } public class Executors { public static ExecutorService newSingleThreadedExecutor(); public static ExecutorService newFixedThreadPool(int n); public static ExecutorService newCachedThreadPool(); ... }
- Getting results from operations that run in separate threads requires a lot of boiler-plate code
-
We need a custom
Runnable
that remembers its result (or exception) as its state and we explicit access to it -
What if we needed to wait on the result? We need to
join()
the thread (boring)
public class Handler implements Runnable { private Exception e = null; private Long result = null; public synchronized void run() { long t = System.currentTimeMillis(); try { // handle request this.result = System.currentTimeMillis() - t; } catch (Exception e) { this.e = e; } finally { this.notifyAll(); } } public synchronized long getResult() throws Exception { while (result == null && e == null) { this.wait(); } if (e != null) { throw e; } else { return result; } } }
Java 5 provides a new interface to define a runnable task that returns a result and may throw an exception:
public interface Callable<V> { public V call() throws Exception; }
Future
- the result of an async action- An easy way to get the task status, block and wait for its result, or cancel it
Our Handler becomes much simpler with a Callable:
public class Handler implements Callable<Long> { public Long call() throws Exception { long t = System.currentTimeMillis(); //return System.currentTimeMillis() - t; } }
Futures make it easy to define asynchronous operations:
public interface Future<V> { public V get() throws InterruptedExeption,ExecutionException; public V get(long timeout, TimeUnit unit); public boolean cancel(boolean mayInterrupt); public boolean isCanceled(); public boolean isDone(); }
And ExecutorService.submit(Callable<T> task)
conveniently returns Future<T>
for that task.
We just have to call Future.get()
to wait on the result.
Java’s
synchronized
construct forces all lock acquisition and release to occur in a block-structured way- Cannot easily implement chain-locking
- Often used inefficiently
-
Using
Object.wait()
andObject.notify()
for thread sync is extremely hard to do well
public class BlockingMap<K,V> { private final Map<K,V> map = new HashMap<K,V>(); // only one get() a time - poor read performance public synchronized V get(K key) { return map.get(key); } public synchronized V require(K key) { V value = null; while((value = map.get(key)) == null) { try { this.wait(); } catch (InterruptedException e) { break; } } return value; } public synchronized void set(K key, V value) { map.put(key, value); this.notifyAll(); } }
- Locks and condition variables provide an alternative to implicit object monitors
-
ReadWriteLock
allows concurrent read access to a shared resource - Possible to provide deadlock detection and non-reentrant usage semantics
Support for guaranteed fair-ordering
- Access to longest-waiting thread
-
But with increased flexibility, comes additional responsibility (use
try
-finally
to unlock)
public class BlockingMap<K,V> { private Map<K,V> map = ... private ReadWriteLock lock = new ReentrantReadWriteLock(); private Condition vAdded = lock.writeLock().newCondition(); public V get(K key) { lock.readLock().lock(); try { return map.get(key); } finally { lock.readLock().unlock(); } } public V require(K key) { V value = get(key); // try cheap first if (value == null) { lock.writeLock().lock(); try { while((value = map.get(key)) == null) { vAdded.await(); } } catch (InterruptedException giveup) { ... } finally { lock.writeLock().unlock(); } } return value; } public void set(String key, String value) { lock.writeLock().lock(); try { map.put(key, value); vAdded.signalAll(); } finally { lock.writeLock().unlock(); } } }
- We can do chain-locking (e.g. support for data traversal algorithms)
-
Non-blocking lock attempts:
aLock.tryLock();
-
Interrupted lock attempts:
aLock.lockInterruptibly();
-
Timing out lock attempts:
aLock.tryLock(5, TimeUnit.SECONDS);
if (lock.tryLock()) { try { // do work that requires a lock } finally { lock.unlock(); } } else { // the lock was not available } try { lock.lockInterruptibly(); try { // do work that requires a lock } finally { lock.unlock(); } } catch (InterruptedException e) { // interrupted while waiting } try { if (lock.tryLock(10, TimeUnit.SECONDS)) { try { // do work that requires a lock } finally { lock.unlock(); } } } catch (InterruptedException e) { // interrupted while waiting }
- Conditions provide means for one thread to suspend its execution until notified by another
-
What locks are to
synchronized
, Conditions are to theObject
class’s monitor methods (wait
,notify
, andnotifyAll
) - Support for multiple conditions per lock, uninterruptible conditions, time-wait conditions, absolute time waits
-
Support for uninterruptible conditions:
Condition.awaitUninterruptibly()
-
Timed waits on conditions tell you why it returned:
Condition.await(long time, TimeUnit unit)
returnsfalse
if time elapsed -
Support for absolute time waits:
Condition.awaitUntil(Date deadline)
-
Prior to Java 5, operations such as
i++
had to be wrapped insynchronized
if thread safety was important Now we have atomic "primitives"
- Easier and more efficient
- Can be used in arrays/collections
- Building blocks for non-blocking data structures
public class AtomicInteger extends Number { public int incrementAndGet() {...} public int decrementAndGet() {...} public int getAndIncrement() {...} public int getAndDecrement() {...} ... } public class AtomicLong extends Number { public boolean compareAndSet(long expected, long update) {...} public long addAndGet(long delta) {...} ... } public class AtomicBoolean { public boolean getAndSet(boolean newValue) {...} ... }
Concurrent collections offer thread-safe iteration and better performance than
HashTable
andVector
-
ConcurrentHashMap
-
CopyOnWriteArrayList
-
-
General purpose synchronizers:
Latch
,Semaphore
,Barrier
,Exchanger
-
Nanosecond-granularity timing:
aLock.tryLock(250, TimeUnit.NANOSECONDS)
-
java.util.concurrent.Semaphore
- Counting semaphore maintains a set of permits and is generally used to restrict number of threads that can access a resource -
java.util.concurrent.CyclicBarrier
- Aids a set of threads to wait for each other to reach a common “barrier” point -
java.util.concurrent.CountDownLatch
– Allows a set of threads to wait for some number of operations performed by other thread(s) to complete -
java.util.concurrent.Exchanger<V>
- A synchronization point at which a pair of threads can exchange objects