Catatan kajian multithreading Java (Cave Of Programming-Java Muliti-Threading)
Java Multithreading Study Notes
1. Memulakan BenangDua cara memulakan utas a. meluaskan kelas Thread dan kaedah override run b. melaksanakan antara muka Runnable dan lulus sebagai param konstruktor Thread
class RunnerThread extends Thread{ public void run(){ //to do some action } } class RunnerRunnable implements Runnable{ public void run(){ //to do some action } } public class BasicApp { public static void main(String[] args) { RunnerThread t1 = new RunnerThread() t1.start() RunnerRunnable runnerRunnable = new RunnerRunnable() Thread t2 = new Thread(runnerRunnable) Thread t3 = new Thread(new Runnable(){ public void run(){ //to do some action } }) t2.start() t3.start() } }
2. Dasar-Thread-Synchronization Kata kunci yang tidak menentu memastikan bahawa nilai dalam cache dikemas kini
import java.util.Scanner class Runner implements Runnable{ volatile boolean runFlag = true public void run() { while(runFlag){ System.out.println('Running') try { Thread.sleep(500) } catch (InterruptedException e) { e.printStackTrace() } } } public void setRunFlag(boolean runFlag){ this.runFlag = runFlag } } public class App2 { public static void main(String[] args) { Scanner scan = new Scanner(System.in) Runner runner = new Runner() Thread t1 = new Thread(runner) t1.start() scan.nextLine() runner.setRunFlag(false) scan.close() } }
3. Kata kunci yang disegerakkan
public class App3 { private int count = 0 synchronized private void increment(){ count++ } public static void main(String[] args) throws InterruptedException { App3 app3 = new App3() app3.doWork() } private void doWork() throws InterruptedException{ Thread t1 = new Thread(new Runnable() {public void run() {for(int i=0 i<1000 i++) increment()}}) Thread t2 = new Thread(new Runnable() {public void run() {for(int i=0 i<1000 i++) increment()}}) t1.start() t2.start() t1.join() t2.join() System.out.println('Count='+ count) } }
4. Banyak Kunci Menggunakan Blok Kod Disegerakkan
class Process implements Runnable{ private ArrayList list1 = new ArrayList() private ArrayList list2 = new ArrayList() private Random random = new Random() private Object obj1 = new Object() private Object obj2 = new Object() private void addList1() throws InterruptedException{ synchronized(obj1){ list1.add(random.nextInt(100)) } Thread.sleep(1) } private void addList2() throws InterruptedException{ synchronized(obj2){ list2.add(random.nextInt(100)) } Thread.sleep(1) } public void run(){ for(int i=0 i<1000 i++){ try { addList1() addList2() } catch (InterruptedException e) { } } } public void showSize(){ System.out.println('List1:'+ list1.size() +',List2:'+ list2.size()) } } public class App4 { public static void main(String[] args) throws InterruptedException { StopWatch stopWatch = new StopWatch('App4') stopWatch.start() Process proc = new Process() Thread t1 = new Thread(proc) Thread t2 = new Thread(proc) t1.start() t2.start() t1.join() t2.join() stopWatch.stop() proc.showSize() System.out.println(String.format('App4 takes %d miliseconds', stopWatch.getTotalTimeMillis())) } }
hasil operasi:Senarai1: 2000, Senarai2: 2000 App4 mengambil masa dalam milisaat 2016 Jalankan hasil tanpa kata kunci yang disegerakkan: Senarai1: 1981, Senarai2: 1988 App4 mengambil masa 2018 milisaat
Hasil penambahan kata kunci yang disegerakkan ke kaedah:Senarai1: 2000, Senarai2: 2000
App4 mengambil4093milisaat
private synchronized void addList1() throws InterruptedException{ list1.add(random.nextInt(100)) Thread.sleep(1) } private synchronized void addList2() throws InterruptedException{ list2.add(random.nextInt(100)) Thread.sleep(1) }
5. Kolam Benang
import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.TimeUnit class Runner1 implements Runnable{ private int id public Runner1(final int i){ this.id = i } public void run(){ System.out.println('Starting:'+ id) try { Thread.sleep(2000) } catch (InterruptedException e) { } System.out.println('Complete:'+ id) } } public class App5 { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(2) //Set up 2 thread pools and reuse them to reduce the overhead of creating threads for(int i=0 i<5 i++){ executor.submit(new Runner1(i)) //Submit task } executor.shutdown() //Submit the task ends, wait for the thread to end, and then no more submissions System.out.println('All tasks submitted.') executor.awaitTermination(1, TimeUnit.MINUTES) //If the thread runs for too long, it will end after this time System.out.println('All tasks completed.') } }
6. Selak CountDown
class Process implements Runnable{ private CountDownLatch latch public Process(final CountDownLatch latch){ this.latch = latch } public void run(){ System.out.println('Start, count='+ latch.getCount()) latch.countDown() } } public class App6 { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(9) //CountDownLatch class is thread-safe ExecutorService executor = Executors.newFixedThreadPool(3) for(int i=0 i<20 i++){ executor.submit(new Process(latch)) } executor.shutdown() latch.await() //thread will wait until the count of latch become 0 System.out.println('Complete.') } }
hasil operasi:
Mula, hitung = 9 Mula, hitung = 8 Mula, hitung = 7 Mula, hitung = 6 Mula, hitung = 5 Mula, hitung = 4 Mula, hitung = 3 Mula, hitung = 2 Mula, hitung = 1 Mula, hitung = 0 Mula, hitung = 0 Mula, hitung = 0 Mula, hitung = 0 Mula, hitung = 0 Mula, hitung = 0 Mula, hitung = 0 Mula, hitung = 0 Mula, hitung = 0 Mula, hitung = 0 Selesai. // Selalu selepas kiraan = 0 Mula, hitung = 07. Pengeluar-Pengguna
BlockingQueue adalah Selamat di Thread, tidak ada lagi elemen yang akan ditambahkan setelah ukuran maksimum dicapai, dan tidak ada lagi yang dapat diambil ketika ukurannya 0
public class App7 { private static BlockingQueue queue = new ArrayBlockingQueue(10) private static void producer() throws InterruptedException{ Random random = new Random() while(true){ Thread.sleep(500) queue.put(random.nextInt(100)) } } private static void consumer() throws InterruptedException{ Random random = new Random() while(true){ Thread.sleep(100) if(random.nextInt(10) == 0){ int value = queue.take() System.out.println('Taken value='+ value + ', Queue Size='+ queue.size()) } } } public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new Runnable() { public void run() { try { producer() } catch (InterruptedException e) { } } }) Thread t2 = new Thread(new Runnable() { public void run() { try { consumer() } catch (InterruptedException e) { } } }) t1.start() t2.start() t1.join() t2.join() } }
8. tunggu dan maklumkan
package demo1 import java.util.Scanner class Processor8{ public void producer() throws InterruptedException{ System.out.println('Thread start...') synchronized(this){ wait() } System.out.println('Resumed.') } public void consumer() throws InterruptedException{ Thread.sleep(1000) Scanner scan = new Scanner(System.in) System.out.println('Press enter to be continue...') scan.nextLine() synchronized(this){ notify() //Wake up the thread waiting for this object, and the awakened thread can only continue to run after the object is released Thread.sleep(2500) } if(scan!=null)scan.close() } } public class App8 { public static void main(String[] args) throws InterruptedException { final Processor8 proc = new Processor8() Thread t1 = new Thread(new Runnable() { public void run() { try {proc.producer()} catch (InterruptedException e) {} } }) Thread t2 = new Thread(new Runnable() { public void run() { try {proc.consumer()} catch (InterruptedException e) {} } }) t1.start() t2.start() t1.join() t2.join() } }
9.Contoh Berfungsi Menggunakan Penyegerakan Tahap Rendah
class Processor9 { private LinkedList list = new LinkedList() private final int LIMIT = 10 private Object lock = new Object() private Random random = new Random() public void producer() throws InterruptedException { int value = 0 while (true) { synchronized (lock) { System.out.println('producer') while (list.size() == LIMIT) { lock.wait() } list.add(value++) lock.notify() } } } public void consumer() throws InterruptedException { while (true) { synchronized (lock) { System.out.println('consumer') while (list.size() == 0) { lock.wait() } System.out.print('List size:' + list.size()) int value = list.removeFirst() System.out.println(', taken value:' + value) lock.notify() } Thread.sleep(random.nextInt(1000)) } } } public class App9 { public static void main(String[] args) throws InterruptedException { final Processor9 proc = new Processor9() Thread t1 = new Thread(new Runnable() { public void run() { try { proc.producer() } catch (InterruptedException e) { } } }) Thread t2 = new Thread(new Runnable() { public void run() { try { proc.consumer() } catch (InterruptedException e) { } } }) t1.start() t2.start() t1.join() t2.join() } }
10. Kunci Masuk Semula
import java.util.Scanner import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock class Processor10 { private ReentrantLock lock = new ReentrantLock() Condition cond = lock.newCondition() private int count = 0 public void increment(){ for(int i=0 i<100000000 i++){ count++ } } public void firstThread() { lock.lock() //Analog synchronized keyword System.out.println('Waiting...') try { cond.await() //analog object.wait() System.out.println('Wake up...') increment() } catch (InterruptedException e) { System.out.println('Error1:'+ e.getMessage()) }finally{ lock.unlock() } } public void secondThread() { Scanner scan = null try{ Thread.sleep(1000) lock.lock() //Analog synchronized keyword System.out.println('Press enter to continue...') scan = new Scanner(System.in) scan.nextLine() cond.signal() //analog object.notify() increment() }catch(Exception e){ System.out.println('Error2:'+ e.getMessage()) }finally{ lock.unlock() if(scan!=null)scan.close() } } public void finish(){ System.out.println('Count is:'+ count) } } public class App10 { public static void main(String[] args) throws InterruptedException { final Processor10 proc = new Processor10() Thread t1 = new Thread(new Runnable() { public void run() { proc.firstThread() } }) Thread t2 = new Thread(new Runnable() { public void run() { proc.secondThread() } }) t1.start() t2.start() t1.join() t2.join() proc.finish() } }
11. Kunci mati
class Account{ private int deposit = 10000 private void deposit(final int amount){ this.deposit += amount } private void withdraw(final int amount){ this.deposit -= amount } public static void transfer(final Account acc1, final Account acc2, final int amount){ acc1.withdraw(amount) acc2.deposit(amount) } public int getDeposit(){ return this.deposit } } class Runner11{ Random random = new Random() Account acc1 = new Account() Account acc2 = new Account() Lock lock1 = new ReentrantLock() Lock lock2 = new ReentrantLock() public void firstThread(){ for(int i=0 i<1000000 i++){ try{ accquireLock(lock1, lock2) Account.transfer(acc1, acc2, random.nextInt(1000)) }catch(Exception e){ }finally{ lock1.unlock() lock2.unlock() } } } public void secondThread(){ for(int i=0 i<1000000 i++){ try{ accquireLock(lock2, lock1) Account.transfer(acc2, acc1, random.nextInt(1000)) }catch(Exception e){ }finally{ lock1.unlock() lock2.unlock() } } } public void finish(){ System.out.println('Account1 Amount:'+ acc1.getDeposit()) System.out.println('Account2 Amount:'+ acc2.getDeposit()) System.out.println('Total Amount:'+ (acc1.getDeposit() + acc2.getDeposit())) } public void accquireLock(Lock lock1, Lock lock2) throws InterruptedException{ //2 locks are acquired at the same time before returning, otherwise the single lock acquired will be released for other threads boolean lock1Status = false boolean lock2Status = false while(true){ try{ lock1Status = lock1.tryLock() lock2Status = lock2.tryLock() }finally{ if(lock1Status && lock2Status){ return } if(lock1Status){ lock1.unlock() } if(lock2Status){ lock2.unlock() } } Thread.sleep(10) } } } public class App11 { public static void main(String[] args) throws InterruptedException { final Runner11 runner = new Runner11() Thread t1 = new Thread(new Runnable() { public void run() { runner.firstThread() } }) Thread t2 = new Thread(new Runnable() { public void run() { runner.secondThread() } }) t1.start() t2.start() t1.join() t2.join() runner.finish() } }
12. SemaphoresSemaphore (semaphore) digunakan untuk mengawal bilangan utas yang mengakses sumber tertentu pada masa yang sama. Ini menyelaraskan pelbagai utas untuk memastikan penggunaan sumber bersama yang wajarclass Runner12{ private int connCnt = 0 private Semaphore sem = new Semaphore(10) //Limited to only connect 10 at the same time public void getConnection(){ try{ sem.acquire() synchronized (this) { connCnt++ } System.out.println('Current connection is :'+ connCnt) Thread.sleep(50) synchronized (this) { connCnt-- } }catch(Exception e){ } finally{ sem.release() } } } public class App12 { public static void main(String[] args) throws InterruptedException { final Runner12 runner = new Runner12() ExecutorService executor = Executors.newFixedThreadPool(10) for(int i=0 i<1000 i++){ executor.submit(new Runnable() { public void run() { runner.getConnection() } }) } executor.shutdown() executor.awaitTermination(100, TimeUnit.SECONDS) } }
13. Boleh dipanggil dan Masa Depan
public class App13 { public static void main(String[] args) throws InterruptedException, ExecutionException { final Random random = new Random() ExecutorService executor = Executors.newCachedThreadPool() //Callable is different from Runnable, it can have a return value and throw an exception Future future = executor.submit(new Callable(){ public Integer call() throws InterruptedException{ int duration = random.nextInt(4000) System.out.println('Start...') Thread.sleep(duration) System.out.println('Done.') return duration } }) executor.shutdown() executor.awaitTermination(1, TimeUnit.MINUTES) //Future is used to obtain thread return value, status and other information System.out.println('Takes '+ future.get() + ' miliseconds.') } }
14. Mengganggu Benang
a. gunakan kaedah mengganggu
public class App14 { public static void main(String[] args) throws InterruptedException { System.out.println('Start...') Thread t = new Thread(new Runnable() { public void run() { Random random = new Random() for(int i=0 i<1E8 i++){ if(Thread.interrupted()){ System.out.println('Interrupted...') break } Math.sin(random.nextDouble()) } } }) t.start() t.interrupt() t.join() System.out.println('End.') } }
b. gunakan masa depan.cancel ()
public class App14 { public static void main(String[] args) throws InterruptedException { System.out.println('Start...') ExecutorService exec = Executors.newCachedThreadPool() Future fu = exec.submit(new Callable(){ public Void call(){ Random random = new Random() for(int i=0 i<1E8 i++){ if(Thread.interrupted()){ System.out.println('Interrupted...') break } Math.sin(random.nextDouble()) } return null } }) exec.shutdown() fu.cancel(true) //1. true means that the started thread also ends 2. false means that only the unstarted thread ends exec.awaitTermination(1, TimeUnit.MINUTES) System.out.println('End.') } }
c. gunakan penutupan ExecutorServiceSekarang ()
public class App14 { public static void main(String[] args) throws InterruptedException { System.out.println('Start...') ExecutorService exec = Executors.newCachedThreadPool() exec.submit(new Callable(){ public Void call(){ Random random = new Random() for(int i=0 i<1E8 i++){ if(Thread.interrupted()){ System.out.println('Interrupted...') break } Math.sin(random.nextDouble()) } return null } }) exec.shutdown() exec.shutdownNow() exec.awaitTermination(1, TimeUnit.MINUTES) System.out.println('End.') } }