Programing Language/JAVA

JAVA Concurrent 프로그래밍 (Completable Future)

칼쵸쵸 2022. 11. 6. 14:49

1. 기본적인 쓰레드 예시

System.out.println(Thread.currentThread().getName());
Thread a = new Thread(()->
{
    System.out.println(Thread.currentThread().getName());
    try {
        Thread.sleep(1000l);
    } catch (InterruptedException e) {
        return;
    }
});

a.start();
// my code

// a.join() or a.interrupt()

// my code

start : 시작

join : 해당 쓰레드가 끝날때 까지 기다림

interrupt: 해당 쓰레드를 interrupt 하여 catch 블락에 접근하도록 유도

=> 쓰레드 관련하여 모든 코드 작성,관리하기 어려움

 

2. Executors

: 쓰레드 생성,관리 api 제공

1) 쓰레드 팩토리 생성

import java.util.concurrent.ThreadFactory;

public class CustomThreadFactory implements ThreadFactory {
    String threadname;
    int threadnum = 1;
    public CustomThreadFactory(String threadname)
    {
        this.threadname = threadname;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName(threadname+this.threadnum);
        this.threadnum += 1;
        return thread;
    }
}

2) 실제 쓰레드 생성, 관리

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(()-> System.out.println(Thread.currentThread().getName()));
executorService.shutdown();

ExecutorService executorService2 = Executors.newFixedThreadPool(5,new CustomThreadFactory("Custom Thread"));
executorService2.submit(()-> System.out.println(Thread.currentThread().getName()));
executorService2.submit(()-> System.out.println(Thread.currentThread().getName()));
executorService2.submit(()-> System.out.println(Thread.currentThread().getName()));
executorService2.shutdown();

//        결과
//        pool-1-thread-1
//        Custom Thread1
//        Custom Thread2
//        Custom Thread3

 

1) newSingleThreadExcutor : 1개의 쓰레드 풀로 관리

2) newFixedThreadPool : 지정된 쓰레드 풀 크기로 관리, 쓰레드 팩토리를 지정해서 생성,관리하는 쓰레드마다 특정 처리 가능

=> return 값이 필요한 경우 위와 같은 Runable(void)로 처리 불가

=> Callable 타입으로 생성, 처리하면 return 값 처리 가능 (Future)

 

3. Callable, Future

ExecutorService executorService = Executors.newSingleThreadExecutor();


Callable<String> cl = () -> {
    Thread.sleep(1000l);
    return "gg";
};
Future<String> f  = executorService.submit(cl);

while(true)
{
    System.out.println(f.isDone());
    if(f.isDone())
    {
        System.out.println(f.get());
        break;
    }
}
// f.get(1,TimeUnit.SECONDS);
//timeout 설정 가능

1. callable : return 값이 있는 쓰레드 실행

2. future : callable로 생성한 리턴값

- get : 해당 return값을 받을때 까지 기다림 , time out 설정가능

- cancle : 해당 쓰레드 중단시킴

- isdone : 해당 쓰레드가 끝났는지 확인

=> 예외 처리 불가

=> future 간 조합 어려움

=> future가 끝난후에 작업을 바로 아래 코드단에 적을수가 없음

4. CompletableFuture

//Excutors 사용 가능
//기본적으로 자바 내부의 ForkJoin CommonPool 사용 (deque를 사용,알아서 다른쓰레드에 할당)
ExecutorService executorService = Executors.newFixedThreadPool(3);

CompletableFuture<Void> f2  = CompletableFuture.supplyAsync(()->
{
    return Thread.currentThread().getName();
},executorService).thenApply(x->{
    System.out.println("1 : " + x);
    return x.toUpperCase();
}).thenAcceptAsync((x2) -> {
    System.out.println("2 : " + x2);
},executorService).thenRunAsync(()->
{
    System.out.println("3 : done");
    System.out.println("4 : " + Thread.currentThread().getName());
});
f2.get();
executorService.shutdown();

결과

1 : pool-1-thread-1
2 : POOL-1-THREAD-1
3 : done
4 : ForkJoinPool.commonPool-worker-3

CompletableFuture 객체를 사용하여 결과값을 받은후의 동작을 바로 밑에 작성가능

- run , accept, apply 와 같은 function을 정의하여 사용가능

- 기본적으로 자바 내부 쓰레드 풀을 사용하며 필요시 정의한 쓰레드풀 사용가능

- 각 동작마다 Async로 다른 쓰레드에서 동작하게 할수 있음

- Combine , Compose로 각각 다른 쓰레드의 동작을 조합하여 사용 가능

//Combine
CompletableFuture<String> f1  = CompletableFuture.supplyAsync(()->
{
    System.out.println(Thread.currentThread().getName());
    return Thread.currentThread().getName();
});

CompletableFuture<String> f2  = CompletableFuture.supplyAsync(()->
{
    System.out.println(Thread.currentThread().getName());
    return Thread.currentThread().getName();
});

CompletableFuture<String> f3 = f1.thenCombine(f2,(x1,x2)->
{
    return "Combine : " + x1 + "," +x2 + " are done";
});
System.out.println(f3.get());


//Compose
CompletableFuture<String> f4  = CompletableFuture.supplyAsync(()->
{
    return Thread.currentThread().getName();
});

CompletableFuture<String> f5 = f4.thenCompose((x)->
        CompletableFuture.supplyAsync(()->
                "Compose : " + x + "," +Thread.currentThread().getName() + " are done"));

System.out.println(f5.get());

결과

ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-5
Combine : ForkJoinPool.commonPool-worker-3,ForkJoinPool.commonPool-worker-5 are done
Compose : ForkJoinPool.commonPool-worker-3,ForkJoinPool.commonPool-worker-3 are done

Combine : 각 Future를 각각 동작시켜 조합하여 새로운 결과를 내는 Future 생성

Compose : 정해진 순서대로 Future를 return 후 해당 값을 사용하는 다른 쓰레드를 연결하여 새로운 결과를 내는 Future생성

- Future의 결과값을 받아 CompletableFuture를 return하는 Function 구현 

'Programing Language > JAVA' 카테고리의 다른 글

JAVA 바이트 코드 조작  (0) 2024.04.16
Collection과 람다식을 활용한 데이터 처리 정리 - 1  (0) 2022.05.12
JAVA Thread 구현하기  (0) 2021.02.25
Enumeration과 Iterator  (0) 2021.02.25
JAVA Exception 처리  (0) 2021.02.23