Multi-threading in Java


Topics interesting only for Java programmers

Link to this posting

Postby Ursego » 05 Mar 2019, 07:46

To see the keywords colored, save the following text in a text file and open it in a Java compiler (or in Notepad++ and select in the menu: Language > J > Java).

Code: Select all
// A thread is the smallest unit of execution that can be scheduled by the operating system. A process is a group of associated threads that execute in the same, shared environment. By shared environment, we mean that the threads in the same process share the same memory space and can communicate directly with one another. For example, if one thread updates the value of a static object, then this information is immediately available for other threads within the process to read. A task is a single unit of work performed by a thread. A thread can complete multiple independent tasks but only one task at a time.

// A SYSTEM THREAD is created by the JVM and runs in the background of the application. For example, garbage-collection. For the most part, the execution of system-defined threads is invisible to the developer. When a system-defined thread encounters a problem and cannot recover, such as running out of memory, it generates a Java Error, as opposed to an Exception.

// A USER-DEFINED THREAD is one created by the application developer to accomplish a specific task. For example, the main() method. For simplicity, we commonly refer to threads that contain only one single user-defined thread as a single-threaded application, since we are often uninterested in the system threads.

// ####### run():

// The action happens in the run() method, defined in the Runnable interface. Think of the code you want to execute in a separate thread as the job to do. In other words, you have some work that needs to be done - say, downloading stock prices in the background while other things are happening in the program - so what you really want is that job to be executed in its own thread. So, if the work you want done is the job, the one doing the work (actually executing the job code) is the thread. And the job always starts from a run() method, as follows:
public void run() {/* job code */}
// You always write the code that needs to be run in a separate thread in a run() method. Don't call run() prgrammatically - it will be called by the start() method (which is called by the programmer). The run() method will call other methods, of course, but the thread of execution - the new call stack - always begins by invoking run(). So where does the run() method go? In one of the two classes you can use to define your thread job. You can define and instantiate a thread in one of two ways:
// * WAY 1: Extend the java.lang.Thread class. It implements java.lang.Runnable interface, so run() must be implemented in each descendant of Thread. But if you extend Thread, you can’t extend anything else!
// * WAY 2: Implement the java.lang.Runnable interface in your class; then, pass an object of your class as a parameter to the constructor of an object of type Thread.

// WAY 1:

public class CheckResults {
   private static int counter = 0;
   public static void main(String[] args) throws InterruptedException {
      Runnable runnableTask = () -> { for (int i = 0; i < 500; i++) CheckResults.counter++; }

      new Thread(runnableTask).start(); // the lambda expression (runnableTask) becomes the code of the run() method! Alternatively, you can simply override run()

      while (CheckResults.counter < 100) {
         System.out.println("Not reached yet");
         Thread.sleep(1000); // read CheckResults.counter once a second; Thread.sleep() throws the checked InterruptedException
      }

      System.out.println("Reached!");
   }
}
// The following code would then create a thread and start it running:
CheckResults p = new CheckResults();
p.start(); // that calls run()

// Constants of Thread class:
public static final int   MIN_PRIORITY = 1;
public static final int   NORM_PRIORITY = 5;
public static final int   MAX_PRIORITY = 10;

// WAY 2:

// The java.lang.Runnable interface is a functional interface, that is, an interface with one abstract method. The following is the definition of the Runnable interface:
@FunctionalInterface public interface Runnable {
   void run();
}

// The following lambda expressions each rely on the Runnable interface:
() -> System.out.println("Hello World")
() -> {int i=10; i++;}
() -> {return;}
() -> {}
// Notice that all of these lambda expressions start with a set of empty parentheses, (). Also, note that none of them return a value. For these reasons, the following lambdas, while valid for other functional interfaces, are not compatible with Runnable because they each return a value:
() -> ""
() -> 5
() -> {return new Object();}

class PrimeRun implements Runnable {
   long minPrime;
   PrimeRun(long minPrime) {this.minPrime = minPrime;}
   public void run() {/* ...compute primes larger than minPrime... */}
}
// The following code would then create a thread and start it running:
PrimeRun p = new PrimeRun(143);
new Thread(p).start(); // pass an object of your class as a parameter to the constructor of an object of type Thread

// It is extremely common when interviewing for a Java development position to be asked to explain the difference between extending the Thread class and implementing Runnable. The following are some reasons to prefer one method over the other in Java:
// * If you need to define your own Thread rules upon which multiple tasks will rely, such as a priority Thread, extending Thread may be preferable.
// * Since Java doesn't support multiple inheritance, extending Thread does not allow you to extend any other class, whereas implementing Runnable lets you extend another class.
// * Implementing Runnable is often a better object-oriented design practice since it separates the task being performed from the Thread object performing it.
// * Implementing Runnable allows the class to be used by numerous Concurrency API classes.
// * We can now use the ExecutorService to perform thread tasks without having to create Thread objects directly.

// The Callable interface:
// java.util.concurrent.Callable interface is similar to Runnable except that its call() method returns a value and can throw a checked exception (as you may remember from the definition of Runnable, the run() method returns void and cannot throw any checked exceptions). The Callable interface was introduced as an alternative to the Runnable interface, since it allows more details to be retrieved easily from the task after it is completed. Callable was also made a functional interface. The definition:
@FunctionalInterface public interface Callable<V> {
   V call() throws Exception;
}

// ####### ExecutorService:

// Interface, extending Executor interface. Creates and manages threads for you. You first obtain an instance of an ExecutorService interface, and then you send the service tasks to be processed. The framework includes numerous useful features, such as thread pooling and scheduling. It is recommended that you use this framework anytime you need to create and execute a separate task, even if you need only a single thread.

// Important method of the parent interface Executor:
void execute(Runnable command) // executes the given command at some time in the future.

// The Concurrency API includes the Executors factory class that can be used to create instances of the ExecutorService object.
import java.util.concurrent.*;
public class ZooInfo {
   public static void main(String[] args) {
      ExecutorService executorService = null;
      try {
         executorService = Executors.newSingleThreadExecutor(); // obtain an instance of an ExecutorService interface from the factory
         System.out.println("begin");
         executorService.execute(() -> System.out.println("Printing zoo inventory"));
         executorService.execute(() -> { for (int i = 0; i < 3; i++) System.out.println("Printing record: " + i ); });
         executorService.execute(() -> System.out.println("Printing zoo inventory"));
         System.out.println("end");
      } finally {
         // Unfortunately, the ExecutorService interface does not implement AutoCloseable, so you cannot use a try-with-resources statement. Instead, you need to use a finally block for cleanup.
         if (executorService != null) executorService.shutdown();
      }
   }
}
// For example, the following is a possible output for this code snippet:
// begin
// Printing zoo inventory
// Printing record: 0
// Printing record: 1
// end
// Printing record: 2
// Printing zoo inventory
// With a single-thread executor, results are guaranteed to be executed in the order in which they are added to the executor service. Notice that the end text is output while our thread executor tasks are still running. This is because the main() method is still an independent thread from the ExecutorService, and it can perform tasks while the other thread is running.

// Shutting Down a Thread Executor:
// Once you have finished using a thread executor, it is important that you call the shutdown() method. A thread executor creates a non-daemon thread on the first task that is executed, so failing to call shutdown() will result in your application never terminating. Failure to shut down a thread executor after at least one thread has been created will result in the program hanging. THE SHUTDOWN PROCESS FOR A THREAD EXECUTOR INVOLVES FIRST REJECTING ANY NEW TASKS SUBMITTED TO THE THREAD EXECUTOR WHILE CONTINUING TO EXECUTE ANY PREVIOUSLY SUBMITTED TASKS. During this time, calling isShutdown() will return true, while isTerminated() will return false. If a new task is submitted to the thread executor while it is shutting down, a RejectedExecutionException will be thrown. Once all active tasks have been completed, isShutdown() and isTerminated() will both return true. The life cycle of an ExecutorService object:
// @ Create New Thread Executor.
// @ The Executor is ACTIVE: Accepts New Tasks, Executes Tasks, isShutdown() = false, isTerminated() = false.
// @ Call shutdown()
// @ The Executor is SHUTTING DOWN: Rejects New Tasks, Executes Tasks, isShutdown() = true, isTerminated() = false.
// @ All Tasks Finished.
// @ The Executor is SHUTDOWN: Rejects New Tasks, No Tasks Running, isShutdown() = true, isTerminated() = true.

// ####### Submitting Tasks with ExecutorService:

// The execute() method of ExecutorService takes a Runnable lambda expression or instance and completes the task asynchronously. Because the return type of the method is void, it does not tell us anything about the result of the task. It is considered a "fire-and-forget" method, as once it is submitted, the results are not directly available to the calling thread. Fortunately, the writers of the Java added submit() methods to the ExecutorService interface, which, like execute(), can be used to complete tasks asynchronously. Unlike execute(), though, submit() returns a Future object that can be used to determine if the task is complete. It can also be used to return a generic result object after the task has been completed.

// ExecutorService methods:

Future<?> submit(Runnable task)
// Executes a Runnable task at some point in the future and returns a Future representing the pending results of the task. THIS OVEWRLOAD IS CALLED IF THE PASSED LAMBDA RETURNS VOID.

Future<T> submit(Callable<T> task)
// Executes a Callable task at some point in the future and returns a Future representing the pending results of the task. THIS OVEWRLOAD IS CALLED IF THE PASSED LAMBDA RETURNS A VALUE. The type of that value should appear instead of T in the generics.

// The submit() methods execute one task. The following 2 methods execute a collection of tasks:

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
// Executes all tasks in a provided collection and returns a List of ordered Future objects, with one Future object corresponding to each submitted task, in the order they were in the original collection. Even though Future.isDone() returns true for each element in the returned List, a task could have completed normally or thrown an exception. Will wait indefinitely until ALL TASKS are complete.

<T> invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
// Executes a collection of tasks and returns the result of one of the tasks that successfully completes execution, cancelling all unfinished tasks. While the first task to finish is often returned, this behavior is not guaranteed, as any completed task can be returned by this method. Will wait indefinitely until AT LEAST ONE TASK completes.

// As you might have noticed, the execute() and submit() methods are nearly identical when applied to Runnable expressions. The submit() method has the obvious advantage of doing the exact same thing execute() does, but with a return object that can be used to track the result. Because of this advantage and the fact that execute() does not support Callable expressions, we tend to prefer submit() over execute(), even if you don't store the Future reference.

// Both invokeAll() and invokeAny() take a Collection object containing a list of tasks to execute. Both of these methods also execute synchronously. By synchronous, we mean that unlike the other methods used to submit tasks to a thread executor, these methods will wait until the results are available before returning control to the enclosing program.

// The ExecutorService interface also includes overloaded versions of invokeAll() and invokeAny() that take a timeout value and a parameter of TimeUnit enum type (NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS).

// >>>>>>> Interface java.util.concurrent.Future<V> to determine the state of a task. Waiting for results:

// How do we know when a task submitted to an ExecutorService is complete? The submit() method returns a Future<V> object, that can be used to determine this result. A Future represents the result of an asynchronous computation.
Future<?> future = executorService.submit(() -> System.out.println("Hello Zoo"));
// The Future class includes methods that are useful in determining the state of a task:
boolean isDone() // Returns true if the task IS NOT RUNNING (i.e. was completed, threw an exception, or was cancelled).
boolean isCancelled() // Returns true if the task was cancelled before it completed normally.
boolean cancel(boolean mayInterruptIfRunning) // Attempts to cancel execution of the task.
V get() // Retrieves the result of a task, waiting endlessly if it is not yet available.
V get(long timeout, TimeUnit unit) // Retrieves the result of a task, waiting the specified amount of time. If the result is not ready by the time the timeout is reached, a checked TimeoutException will be thrown.

// The following is an updated version of our earlier polling example CheckResults class, which uses a Future instance to poll for the results:
import java.util.concurrent.*;
public class CheckResults {
   private static int counter = 0;
   public static void main(String[] args) throws InterruptedException, ExecutionException {
      ExecutorService executorService = null;
      try {
         executorService = Executors.newSingleThreadExecutor();
         Future<?> future = executorService.submit(() -> { for(int i = 0; i < 500; i++) CheckResults.counter++; }); // since the lambda returns void, the Runnable overload is called.
         future.get(10, TimeUnit.SECONDS); // wait at most 10 seconds, throwing a TimeoutException if the task is not done
         System.out.println("Reached!");
      } catch (TimeoutException e) {
         System.out.println("Not reached in time");
      } finally {
         if (executorService != null) executorService.shutdown();
      }
   }
}
// This example is similar to our earlier polling implementation, but it does not use the Thread class directly. In part, this is the essence of the Concurrency API: to do complex things with threads without using the Thread class directly. It also waits at most 10 seconds, throwing a TimeoutException if the task is not done. What is the return value of this task? As Future<V> is a generic class, the type V is determined by the return type of the Runnable method. Since the return type of Runnable.run() is void, the get() method always returns null. There is another task class compatible with ExecutorService that supports other return types.

// Unlike Runnable, in which the get() methods always return null, the get() methods on a Future object return the matching generic type or null. Let’s take a look at an example using Callable instead of Runnable:
import java.util.concurrent.*;
public class AddData {
   public static void main(String[] args) throws InterruptedException, ExecutionException {
      ExecutorService executorService = null;
      try {
         executorService = Executors.newSingleThreadExecutor();
         Future<Integer> future = executorService.submit(() -> 30 + 11); // since the lambda returns a value, the Callable<T> overload is called.
                                                         // The lambda returns int, which will be boxed to Integer, so the return type of submit() is Future<Integer>
         Integer taskResult = future.get() // obtain the value, returned by the lambda
         System.out.println(taskResult); // 41
      } finally {
         if (executorService != executorService) service.shutdown();
      }
   }
}

// Checked Exceptions in Callable and Runnable:
// Besides having a return type, the Callable interface also supports checked exceptions, whereas the Runnable interface does not without an embedded try/catch block. Given an instance of ExecutorService called executorService, which of the following lines of code will or will not compile?
executorService.submit(() -> {Thread.sleep(1000); return null;});
executorService.submit(() -> {Thread.sleep(1000);});
// The first line will compile, while the second line will not. Why? Thread.sleep() throws a checked InterruptedException. Since the first lambda expression has a return type, the compiler treats this as a Callable expression that supports checked exceptions. The second lambda expression does not return a value; therefore, the compiler treats this as a Runnable expression. Since Runnable methods do not support checked exceptions, the compiler will report an error trying to compile this code snippet.

// Waiting for All Tasks to Finish:
// After submitting a set of tasks to a thread executor, it is common to wait for the results. As you saw in the previous sections, one solution is to call get() on each Future object returned by the submit() method. If we don’t need the results of the tasks and are finished using our thread executor, there is a simpler approach. First, we shut down the thread executor. Next, use the awaitTermination(long timeout, TimeUnit unit) method available for all thread executors. The method waits the specified time to complete all tasks, returning sooner if all tasks finish or an InterruptedException is detected:
ExecutorService executorService = null;
try {
   executorService = Executors.newSingleThreadExecutor();
   // Add tasks to the thread executor...
} finally {
   if (executorService != null) executorService.shutdown();
}
if (executorService != null) {
   executorService.awaitTermination(1, TimeUnit.MINUTES); // wait up to one minute for the results.
   // Check whether all tasks are finished
   if (executorService.isTerminated()) // returns true if all tasks are actually finished
      System.out.println("All tasks finished");
   else
      System.out.println("At least one task is still running");
}

// Scheduling Tasks:
// The ScheduledExecutorService (a subinterface of ExecutorService) can be used to schedule the task to happen repeatedly, at some set interval. Methods of ScheduledExecutorService (all return ScheduledFuture<V> which is identical to Future<V> plus includes a getDelay() method that returns the delay set when the process was created):

ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
// Creates and executes a Callable task after the given delay.

ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
// Creates and executes a Runnable task after the given delay.

ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
// Creates a Runnable task and submits it to the executor every period (first time - after the given initial delay), REGARDLESS OF WHETHER OR NOT THE PREVIOUS TASK FINISHED. One risk of using this method is the possibility a task could consistently take longer to run than the period between tasks. Despite the fact that the task is still running, the ScheduledExecutorService would submit a new task to be started every period of time. If a single-thread executor was used, over time this would result in endless set tasks being scheduled.

ScheduledFuture<?> scheduleAtFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
// Creates a Runnable task and submits it to the executor every period (first time - after the given initial delay) STARTING DELAY COUNTING AFTER THE PREVIOUS TASK HAS FINISHED. For example, if the first task runs at 12:00 and takes five minutes to finish, with a period of 2 minutes, then the second task will start at 12:07.

// Notice that neither of the methods, scheduleAtFixedDelay() and scheduleAtFixedRate(), take a Callable object as an input parameter.

// Example:
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
Runnable runnableTask = () -> System.out.println("Hello Zoo");
Callable<String> collableTask = () -> "Monkey";
ScheduledFuture<?> result1 = scheduledExecutorService.schedule(runnableTask, 10, TimeUnit.SECONDS); // execute the task once after 10 seconds
ScheduledFuture<String> result2 = scheduledExecutorService.schedule(collableTask, 8, TimeUnit.MINUTES); // execute the task once after 8 minutes
scheduledExecutorService.scheduleAtFixedRate(runnableTask, 5, 1, TimeUnit.MINUTE); // execute the task after 5 minutes of delay and then repeat every 1 minute
scheduledExecutorService.scheduleAtFixedDelay(runnableTask, 0, 2, TimeUnit.MINUTE); // execute the task immediately and repeat each time 2 minutes after finishing the previous task

// While these tasks are scheduled in the future, the actual execution may be delayed. For example, there may be no threads available to perform the task, at which point they will just wait in the queue. Also, if the ScheduledExecutorService is shut down by the time the scheduled task execution time is reached, they will be discarded.

// Increasing Concurrency with Pools:
// We now present three additional factory methods in the Executors class that act on a pool of threads, rather than on a single thread. A thread pool is a group of pre-instantiated reusable threads that are available to perform a set of arbitrary tasks. The difference between a single-thread and a pooled-thread executor is what happens when a task is already running. While a single-thread executor will wait for an available thread to become available before running the next task, a pooled-thread executor can execute the next task concurrently. If the pool runs out of available threads, the task will be queued by the thread executor and wait to be completed. Here are our two previous single-thread executor methods, along with the new multi-threaded methods:

ExecutorService newSingleThreadExecutor()
// Creates a single-threaded executor that uses a single worker thread operating off an unbounded queue. Results are processed sequentially in the order in which they are submitted.

ScheduledExecutorService newSingleThreadScheduledExecutor()
// Creates a single-threaded executor that can schedule commands to run after a given delay or to execute periodically.

ExecutorService newCachedThreadPool()
// Creates a thread pool OF UNBOUNDED SIZE that creates new threads as needed, but will reuse previously constructed threads when they are available. This is commonly used for pools that require executing many short-lived asynchronous tasks. For long-lived processes, usage of this executor is strongly discouraged, as it could grow to encompass a large number of threads over the application life cycle.

ExecutorService newFixedThreadPool(int nThreads /* the number of threads in the pool */)
// Creates a thread pool that REUSES A FIXED NUMBER OF THREADS operating off a shared unbounded queue. Takes a number of threads and allocates them all upon creation. As long as our number of tasks is less than our number of threads, all tasks will be executed concurrently. If at any point the number of tasks exceeds the number of threads in the pool, they will wait in similar manner as you saw with a single-thread executor. In fact, calling newFixedThreadPool(1) is equivalent to calling newSingleThreadExecutor().

ScheduledExecutorService newScheduledThreadPool(int nThreads)
// Creates a thread pool that can schedule commands to run after a given delay or to execute periodically. Identical to newFixedThreadPool(), except that it returns an instance of ScheduledExecutorService and is therefore compatible with scheduling tasks. This executor has subtle differences in the way that the scheduleAtFixedRate() performs. For example, recall our previous example in which tasks are executed after 5 minutes delay and then repeated every 1 minute:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
scheduledExecutorService.scheduleAtFixedRate(command, 5, 1, TimeUnit.MINUTE);
// Whereas with a single-thread executor and a five-minute task execution time, an endless set of tasks would be scheduled over time. With a pooled executor, this can be avoided. If the pool size is sufficiently large, 10 for example, then as each thread finishes, it is returned to the pool and results in new threads available for the next tasks as they come up.

// Most tasks are dependent on some other resources, such as a database, file system, or network. In those situations, creating large thread pools is generally safe, as the tasks are not CPU intensive and may involve a lot of waiting for external resources to become available.

// ####### Synchronizing Data Access. Protecting Data with Atomic Classes

// Atomic is the property of an operation to be carried out as a single unit of execution without any interference by another thread. A thread-safe atomic version of the increment operator would be one that performes the read and write of the variable as a single operation, not allowing any other threads to access the variable during the operation. ANY THREAD TRYING TO ACCESS AN ATOMIC VARIABLE WHILE AN ATOMIC OPERATION IS IN PROCESS WILL HAVE TO WAIT UNTIL THE ATOMIC OPERATION ON THE VARIABLE IS COMPLETE.

// Atomic classes:
// AtomicBoolean      A boolean value that may be updated atomically
// AtomicInteger      An int value that may be updated atomically
// AtomicIntegerArray   An int array in which elements may be updated atomically
// AtomicLong         A long value that may be updated atomically
// AtomicLongArray      A long array in which elements may be updated atomically
// AtomicReference      A generic object reference that may be updated atomically
// AtomicReferenceArray   An array of generic object references in which elements may be updated atomically

// Common atomic methods:
// get()            Retrieve the current value
// set()            Set the given value, equivalent to the assignment = operator
// getAndSet()         Atomically sets the new value and returns the old value
// incrementAndGet()   For numeric classes, atomic pre-increment operation equivalent to ++value
// getAndIncrement()   For numeric classes, atomic post-increment operation equivalent to value++
// decrementAndGet()   For numeric classes, atomic pre-decrement operation equivalent to --value
// getAndDecrement()   For numeric classes, atomic post-decrement operation equivalent to value--

// ++ is NOT thread-safe because it includes 2 actions: increment and reading (returning the incremented value). When thread A has incremented the variable, but has not read it yet, thread B can read or change it. The following code is NOT thread-safe because of ++ operation:
private int sheepCount = 0;
private void incrementAndReport() { System.out.print((++sheepCount) + " "); }
// The thread-safe version:
private AtomicInteger sheepCount = new AtomicInteger(0);
private void incrementAndReport() { System.out.print(sheepCount.incrementAndGet() + " "); }

// synchronized block:
// BLOCK OF CODE WHICH CAN BE EXECUTED ONLY BY ONE THREAD AT A TIME. Each thread that arrives will first check if any threads are in the block. In this manner, a thread "acquires the lock". If the lock is available, a single thread will enter the block, acquiring the lock and preventing all other threads from entering. While the first thread is executing the block, all threads that arrive will attempt to acquire the same lock and wait for first thread to finish. Once a thread finishes executing the block, it will release the lock, allowing one of the waiting threads to proceed:
SheepManager manager = new SheepManager();
synchronized(manager) {
   // Work to be completed by one thread at a time
}
// The sheepCount example re-written:
private int sheepCount = 0;
private void incrementAndReport() {
   synchronized(this) {
      System.out.print((++sheepCount) + " ");
   }
}
// We could have used an atomic count variable (AtomicInteger sheepCount) along with the synchronized block in this example, although it is unnecessary. Since synchronized blocks allow only one thread to enter, we're not gaining any improvement by using an atomic variable if the only time that we access the variable is within a synchronized block.

// synchronized method:
// A METHOD WHICH CAN BY EXECUTED ONLY BY ONE THREAD AT A TIME. In the previous example, we used synchronized(this) around the body of the method. Java actually provides a more convenient compiler enhancement for doing so. We can add the synchronized modifier to any instance method to synchronize automatically on the object itself. For example, the following method definition is equivalent to the previous, synchronized(this) version:
private synchronized void incrementAndReport() { System.out.print((++sheepCount) + " "); }

// ####### Concurrent Collections

// You should use a concurrent collection class anytime that you are going to have multiple threads modify a collections object outside a synchronized block or method, even if you don’t expect a concurrency problem. On the other hand, if all of the threads are accessing an established immutable or read-only collection, a concurrent collection class is not required.

// Concurrent Class:      Java Collections Framework Interface:
// ConcurrentHashMap      ConcurrentMap
// ConcurrentLinkedDeque   Deque
// ConcurrentLinkedQueue   Queue
// ConcurrentSkipListMap   ConcurrentMap, SortedMap, NavigableMap
// ConcurrentSkipListSet   SortedSet, NavigableSet
// CopyOnWriteArrayList      List
// CopyOnWriteArraySet      Set
// LinkedBlockingDeque      BlockingQueue, BlockingDeque
// LinkedBlockingQueue      BlockingQueue

// In the same way that we instantiate an ArrayList object but pass around a List reference, it is considered a good practice to instantiate a concurrent collection but pass it around using a non-concurrent interface whenever possible:
public class ZooManager {
   private Map<String,Object> foodData = new ConcurrentHashMap<String,Object>(); // a non-concurrent-type variable points to a concurrent-type object
   public void put(String key, String value) { foodData.put(key, value); }
   public Object get(String key) { return foodData.get(key); }
}
// The ConcurrentHashMap implements the ConcurrentMap interface. You can use either reference type, Map or ConcurrentMap, to access a ConcurrentHashMap object, depending on whether or not you want the caller to know anything about the underlying implementation. For example, a method signature may require a ConcurrentMap reference to ensure that object passed to it is properly supported in a multi-threaded environment.

// There are two queue classes that implement blocking interfaces: LinkedBlockingQueue and LinkedBlockingDeque.

// The BlockingQueue is just like a regular Queue, except that it includes methods that will wait a specific amount of time to complete an operation:
offer(E e, long timeout, TimeUnit unit) // Adds item to the queue waiting the specified time, returning false if time elapses before space is available
poll(long timeout, TimeUnit unit) // Retrieves and removes an item from the queue, waiting the specified time, returning null if the time elapses before the item is available

// The following sample is using a LinkedBlockingQueue to wait for the results of some of the operations:
try {
   BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>();
   blockingQueue.offer(39); // since LinkedBlockingQueue implements both Queue and BlockingQueue, we can use methods available to both, such as those that don’t take any wait arguments
   blockingQueue.offer(3, 4, TimeUnit.SECONDS);
   System.out.println(blockingQueue.poll());
   System.out.println(blockingQueue.poll(10, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
   // Handle interruption (the methods offer() and pull() can throw a checked InterruptedException, as they can be interrupted before they finish waiting for a result)
}
// LinkedBlockingDeque class that maintains a doubly linked list between elements and implements a BlockingDeque interface. The BlockingDeque interface extends Deque much in the same way that BlockingQueue extends Queue, providing numerous waiting methods:

offerFirst(E e, long timeout, TimeUnit unit)
// Adds an item to the front of the queue, waiting a specified time, returning false if time elapses before space is available

offerLast(E e, long timeout, TimeUnit unit)
// Adds an item to the tail of the queue, waiting a specified time, returning false if time elapses before space is available

pollFirst(long timeout, TimeUnit unit)
// Retrieves and removes an item from the front of the queue, waiting the specified time, returning null if the time elapses before the item is available

pollLast(long timeout, TimeUnit unit)
// Retrieves and removes an item from the tail of the queue, waiting the specified time, returning null if the time elapses before the item is available

// This example creates a LinkedBlockingDeque and assigns it to a BlockingDeque reference. Since BlockingDeque extends Queue, Deque, and BlockingQueue, all of the previously defined queue methods are available for use:
try {
   BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>();
   blockingDeque.offer(91);
   blockingDeque.offerFirst(5, 2, TimeUnit.MINUTES);
   blockingDeque.offerLast(47, 100, TimeUnit.MICROSECONDS);
   blockingDeque.offer(3, 4, TimeUnit.SECONDS);
   System.out.println(blockingDeque.poll());
   System.out.println(blockingDeque.poll(950, TimeUnit.MILLISECONDS));
   System.out.println(blockingDeque.pollFirst(200, TimeUnit.NANOSECONDS));
   System.out.println(blockingDeque.pollLast(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
   // Handle interruption
}

// Obtaining Synchronized Collections:
// Besides the concurrent collection classes that we have covered, the Concurrency API also includes methods for obtaining synchronized versions of existing non-concurrent collection objects. These methods, defined in the Collections class, contain synchronized methods that operate on the inputted regular (not-synchronized) collection and return a reference that is the same type as the underlying collection:
synchronizedCollection(Collection<T> c)
synchronizedList(List<T> list)
synchronizedMap(Map<K,V> m)
synchronizedNavigableMap(NavigableMap<K,V> m)
synchronizedNavigableSet(NavigableSet<T> s)
synchronizedSet(Set<T> s)
synchronizedSortedMap(SortedMap<K,V> m)
synchronizedSortedSet(SortedSet<T> s)
// Example:
Map<String, Object> foodData = new HashMap<String, Object>();
foodData.put("meat", 1);
foodData.put("egg", 2);
Map<String,Object> synchronizedFoodData = Collections.synchronizedMap(foodData);
// If you know at the time of creation that your object requires synchronization, then you should use one of the concurrent collection classes listed above. On the other hand, if you are given an existing collection that is not a concurrent class and need to access it among multiple threads, you can wrap it using the methods.
User avatar
Ursego
Site Admin
 
Posts: 143
Joined: 19 Feb 2013, 20:33



Ketones are a more high-octane fuel for your brain than glucose. Become a biohacker and upgrade yourself to version 2.0!



cron
Traffic Counter

eXTReMe Tracker