Java Fundamentals Tutorial: Threading

14. Threading

14.1. Runnable And Thread

  • To run an object in a thread:

    • Implement Runnable's run() and start() in a new Thread (preferred)
    • Extend Thread, override run(), instantiate, and start()
  • Threads can: join, interrupt, sleep, yield, be prioritized, stack-dumped, be enumerated, grouped, etc.

public class ThreadDemo {
  public static void main(String[] args) throws InterruptedException {
    int numThreads = Integer.parseInt(args[0]);
    int count = Integer.parseInt(args[1]);
    Thread[] threads = new Thread[numThreads];
    System.out.println("Creating threads");
    for (int i = 0; i < threads.length; i++) {
      threads[i] = new Thread(new Runner("Runner " + i, count));
    }
    System.out.println("Starting threads");
    for (int i = 0; i < threads.length; i++) { threads[i].start(); }
    System.out.println("Waiting for threads");
    for (int i = 0; i < threads.length; i++) { threads[i].join(); }
    System.out.println("Done");
  }
}
public class Runner implements Runnable {
  private final String name;
  private final int count;
  public Runner(String name, int count) {
    this.name = name; this.count = count;
  }
  public void run() {
    for (int i = 0; i < count; i++) {
      System.out.println(name + "=" + i);
      Thread.yield();
    }
  }
}

14.2. Thread Synchronization

  • Use synchronized keyword to protect critical sections from concurrent access

    • Do not allow data/structures to be modified by multiple threads simultaneously
    • Can be used on methods or blocks of code
  • Synchronization occurs on an object that acts as a monitor for the running threads

    • Usually, this is the object that is to be protected
    • Can be any other object (even generic instances of Object): Object lock = new Object();

public class Counter {
  private int count = 0;
  public void setCount(int count) { this.count = count; }
  public int getCount() { return this.count; }
}
public class Runner implements Runnable {
  private final int maxCount;
  private final Counter counter;
  public Runner(int maxCount, Counter counter) {
    this.maxCount = maxCount; this.counter = counter;
  }
  public void run() {
    for (int i = 0; i < this.maxCount; i++) {
      int currentCount;
      synchronized(this.counter) {
        currentCount = this.counter.getCount();
        currentCount++;
        this.counter.setCount(currentCount);
      }
      System.out.println(Thread.currentThread().getName()
        + " at count " + currentCount);
    }
  }
}

A better approach would be to add a synchronized count method on the Counter class so that Runners would not have to worry about synchronization.

public synchronized int count() {
  return ++this.count;
}

14.3. More Thread Synchronization

  • Use Object.wait(), Object.notify(), and Object.notifyAll() to schedule threads (wait on each other)

    • Solves the classic Consumer-Producer problem
  • Methods wait(), notify(), and notifyAll() must be called in synchronized blocks
  • Always do checks around wait() using while or for loops rather than simple if conditions
  • Better yet, use Java 5’s java.util.concurrent advanced thread-safe data structures and controls

public class Queue {
  private static class Element {
    final Object value;
    Element next;
    Element(Object value) {this.value = value;}
  }
  private Element first, last;
  private int curSize, maxSize;

  public Queue(int maxSize) { this.maxSize = maxSize; }

  public synchronized void put(Object o) throws InterruptedException {
    while (this.curSize == this.maxSize) { this.wait(); }
    if (this.first == null) {
      this.first = (this.last = new Element(o));
    } else {
      this.last = (this.last.next = new Element(o));
    }
    this.curSize++;
    this.notifyAll();
  }
  public synchronized Object get() throws InterruptedException {
    while (this.curSize == 0) { this.wait(); }
    Object o = this.first.value;
    this.first = this.first.next;
    this.curSize--;
    this.notifyAll();
    return o;
  }
}

14.4. Java 5 Concurrency Features

  • The Java 5’s java.util.concurrent package provides a powerful, extensible framework of high-performance, scalable, thread-safe building blocks for developing concurrent classes and applications

    • Thread pools and custom execution frameworks
    • Queues and related thread-safe collections
    • Atomic variables
    • Special-purpose locks, barriers, semaphores and condition variables
  • Better than synchronized, wait(), notify(), …

  • Its packages free the programmer from having to write these utilities by hand, like Collections did for data structures
  • The packages also provide low-level primitives for advanced concurrent programming
  • Built-in concurrency primitives : wait(), notify(), synchronized are too hard:

    • Hard to use
    • Easy to make errors
    • Low level for most applications
    • Poor performance if used incorrectly
    • Too much wheel-reinventing (not standardized)
  • Concurrency APIs in Java 5

    • java.util.concurrent - Utility classes and frameworks commonly used in concurrent programming - e.g. thread pools
    • java.util.concurrent.atomic - Classes that support lock-free and thread-safe operations on single variables - e.g. atomic i++
    • java.util.concurrent.locks - Framework for locking and waiting for conditions that is distinct from built-in synchronization and monitors

14.5. Thread Scheduling Pre-Java 5

public class NetworkedService {
  public void service(int port) throws Exception {
    ServerSocket ss = new ServerSocket(port);
    while(true) {
      new Thread(new Handler(ss.accept())).start(); //1
    }
  }
  private static class Handler implements Runnable {
    private final Socket s;
    public Handler(Socket s) { this.s = s; }
    public void run() {
      //2
    }
  }
}

1

New thread every time - poor resource management

2

Read requests, access registry, return response

[Note]Note

With no access to a standard thread-pool library, we end up creating Threads on demand. Works well for very simple cases, but does not scale and it is not resource-efficient.

14.6. Thread Scheduling In Java 5

public class NetworkedService {
  public void service(int port) throws Exception {
    ServerSocket ss = new ServerSocket(port);
    ExecutorService pool = Executors.newCachedThreadPool(); //1

    while(true) {
      pool.execute(new Handler(ss.accept())); //2
    }
  }
  private static class Handler implements Runnable {
    //3
  }
}

1

Pick a pool scheduling strategy that fits our needs

2

Let the executor service manage the threads as needed

3

Same implementation as before

  • Java 5’s Scheduling framework standardizes invocation, scheduling, execution, and control of asynchronous tasks
  • We can use its thread-pool executor service to provide better resource utilization
  • Executor interface decouples task submission from the mechanics of how it will be run:

    public interface Executor {
      public void execute(Runnable command);
    }
  • Very easy to build different execution strategies:

    public class InThreadExecutor implements Executor {
      public void execute(Runnable r) {
        r.run();
      }
    }
    
    public class ThreadPerTaskExecutor implements Executor {
      public void execute(Runnable r) {
        new Thread(r).start();
      }
    }

14.7. Java 5’s ExecutorService

  • ExecutorService builds upon Executor to provide lifecycle management and support for tracking progress of asynchronous tasks
  • Constructed through through factory methods on Executors class
  • ScheduledExecutorService provides a framework for executing deferred and recurring tasks that can be cancelled

    • Like Timer but supports pooling

public interface ExecutorService extends Executor {
  /** Stop accepting new tasks */
  public void shutdown();
  /** Stop accepting new tasks, cancel tasks
      currently waiting, and interrupt active tasks */
  public List<Runnable> shutdownNow();
  public boolean isShutdown();
  public boolean isTerminated();
  public boolean awaitTermination(long timeout, TimeUnit unit);
  public void execute(Runnable c);
  public <T> Future<T> submit(Callable<T> task);
  ...
}

public class Executors {
  public static ExecutorService newSingleThreadedExecutor();
  public static ExecutorService newFixedThreadPool(int n);
  public static ExecutorService newCachedThreadPool();
  ...
}

14.8. Getting Results From Threads Pre Java 5

  • Getting results from operations that run in separate threads requires a lot of boiler-plate code
  • We need a custom Runnable that remembers its result (or exception) as its state and we explicit access to it
  • What if we needed to wait on the result? We need to join() the thread (boring)

public class Handler implements Runnable {
  private Exception e = null;
  private Long result = null;
  public synchronized void run() {
    long t = System.currentTimeMillis();
    try {
      // handle request
      this.result = System.currentTimeMillis() - t;
    } catch (Exception e) {
      this.e = e;
    } finally {
      this.notifyAll();
    }
  }
  public synchronized long getResult() throws Exception {
    while (result == null && e == null) {
      this.wait();
    }
    if (e != null) {
      throw e;
    } else {
      return result;
    }
  }
}

14.9. Getting Results From Threads In Java 5

  • Java 5 provides a new interface to define a runnable task that returns a result and may throw an exception:

    public interface Callable<V> {
      public V call() throws Exception;
    }
  • Future - the result of an async action

    • An easy way to get the task status, block and wait for its result, or cancel it

Our Handler becomes much simpler with a Callable:

public class Handler implements Callable<Long> {
  public Long call() throws Exception {
    long t = System.currentTimeMillis(); //1
    return System.currentTimeMillis() - t;
  }
}

1

Handle request that can throw an Exception

Futures make it easy to define asynchronous operations:

public interface Future<V> {
  public V get() throws InterruptedExeption,ExecutionException;
  public V get(long timeout, TimeUnit unit);
  public boolean cancel(boolean mayInterrupt);
  public boolean isCanceled();
  public boolean isDone();
}

And ExecutorService.submit(Callable<T> task) conveniently returns Future<T> for that task. We just have to call Future.get() to wait on the result.

14.10. Thread Sync Pre-Java 5

  • Java’s synchronized construct forces all lock acquisition and release to occur in a block-structured way

    • Cannot easily implement chain-locking
    • Often used inefficiently
  • Using Object.wait() and Object.notify() for thread sync is extremely hard to do well

public class BlockingMap<K,V> {
  private final Map<K,V> map = new HashMap<K,V>();

  // only one get() a time - poor read performance
  public synchronized V get(K key) {
    return map.get(key);
  }
  public synchronized V require(K key) {
    V value = null;
    while((value = map.get(key)) == null) {
      try {
        this.wait();
      }
      catch (InterruptedException e) {
        break;
      }
    }
    return value;
  }
  public synchronized void set(K key, V value) {
    map.put(key, value);
    this.notifyAll();
  }
}

14.11. Thread Sync In Java 5 With Locks

  • Locks and condition variables provide an alternative to implicit object monitors
  • ReadWriteLock allows concurrent read access to a shared resource
  • Possible to provide deadlock detection and non-reentrant usage semantics
  • Support for guaranteed fair-ordering

    • Access to longest-waiting thread
  • But with increased flexibility, comes additional responsibility (use try-finally to unlock)

public class BlockingMap<K,V> {
  private Map<K,V> map = ...
  private ReadWriteLock lock = new ReentrantReadWriteLock();
  private Condition vAdded = lock.writeLock().newCondition();
  public V get(K key) {
    lock.readLock().lock();
    try {
      return map.get(key);
    } finally {
      lock.readLock().unlock();
    }
  }
  public V require(K key) {
    V value = get(key); // try cheap first
    if (value == null) {
      lock.writeLock().lock();
      try {
        while((value = map.get(key)) == null) {
          vAdded.await();
        }
      } catch (InterruptedException giveup) {
        ...
      } finally {
        lock.writeLock().unlock();
      }
    }
    return value;
  }
  public void set(String key, String value) {
    lock.writeLock().lock();
    try {
      map.put(key, value);
      vAdded.signalAll();
    } finally {
      lock.writeLock().unlock();
    }
  }
}

14.12. Benefits Of Java 5 Locks

  • We can do chain-locking (e.g. support for data traversal algorithms)
  • Non-blocking lock attempts: aLock.tryLock();
  • Interrupted lock attempts: aLock.lockInterruptibly();
  • Timing out lock attempts: aLock.tryLock(5, TimeUnit.SECONDS);

if (lock.tryLock()) {
  try {
    // do work that requires a lock
  } finally {
    lock.unlock();
  }
} else {
  // the lock was not available
}

try {
  lock.lockInterruptibly();
  try {
    // do work that requires a lock
  } finally {
    lock.unlock();
  }
} catch (InterruptedException e) {
  // interrupted while waiting
}

try {
  if (lock.tryLock(10, TimeUnit.SECONDS)) {
    try {
      // do work that requires a lock
    } finally {
      lock.unlock();
    }
  }
} catch (InterruptedException e) {
  // interrupted while waiting
}

14.13. Java 5 Conditions

  • Conditions provide means for one thread to suspend its execution until notified by another
  • What locks are to synchronized, Conditions are to the Object class’s monitor methods (wait, notify, and notifyAll)
  • Support for multiple conditions per lock, uninterruptible conditions, time-wait conditions, absolute time waits

  • Support for uninterruptible conditions: Condition.awaitUninterruptibly()
  • Timed waits on conditions tell you why it returned: Condition.await(long time, TimeUnit unit) returns false if time elapsed
  • Support for absolute time waits: Condition.awaitUntil(Date deadline)

14.14. Atomics In Java 5

  • Prior to Java 5, operations such as i++ had to be wrapped in synchronized if thread safety was important
  • Now we have atomic "primitives"

    • Easier and more efficient
    • Can be used in arrays/collections
    • Building blocks for non-blocking data structures

public class AtomicInteger extends Number {
  public int incrementAndGet() {...}
  public int decrementAndGet() {...}
  public int getAndIncrement() {...}
  public int getAndDecrement() {...}
  ...
}
public class AtomicLong extends Number {
  public boolean compareAndSet(long expected,
    long update) {...}
  public long addAndGet(long delta) {...}
  ...
}
public class AtomicBoolean {
  public boolean getAndSet(boolean newValue) {...}
  ...
}

14.15. Other Java 5 Concurrency Features

  • Concurrent collections offer thread-safe iteration and better performance than HashTable and Vector

    • ConcurrentHashMap
    • CopyOnWriteArrayList
  • General purpose synchronizers: Latch, Semaphore, Barrier, Exchanger
  • Nanosecond-granularity timing: aLock.tryLock(250, TimeUnit.NANOSECONDS)

  • java.util.concurrent.Semaphore - Counting semaphore maintains a set of permits and is generally used to restrict number of threads that can access a resource
  • java.util.concurrent.CyclicBarrier - Aids a set of threads to wait for each other to reach a common “barrier” point
  • java.util.concurrent.CountDownLatch – Allows a set of threads to wait for some number of operations performed by other thread(s) to complete
  • java.util.concurrent.Exchanger<V> - A synchronization point at which a pair of threads can exchange objects