[Springboot] Sse(Server Send Event) 단방향 통신을 이용해 tail -f 기능 구현
난 이걸 왜 쓰게되었나?
OCP 웹콘솔에 보면 pod의 log를 지속적으로 호출하는 페이지가 있는데, 말 그대로 서버가 클라이언트 쪽에 로그를 일방적으로 보내는 방식인거 같았고, 이걸 구현해보고 싶다는 생각이 들어 통신모듈을 찾아보다가 SSE(Server Send Event)라는 라이브러리를 알게되어 찾아보니 보통 접속중인 사용자에게 push 알림을 보내는 용도, 스포츠 중계서비스에 이용할 수 있다. 필자는 리눅스에서 로그 파일을 볼 때 이용하는 tail -f 기능으로 활용해보려한다.
단방향 통신을 위한 모듈
SeeEmitter 클래스는 2015년(Spring Framework 4.2)부터 사용할 수 있게 되었다.
Internet Exporer를 제외한 모든 브라우저를 지원한다.
HTTP/1.1 프로토콜 사용시 브라우저에서 1개 도메인에 대해 생성할 수 있는 EventStream은 최대 6개이다.
HTTP/2 프로토콜 사용시 브라우저와 서버간 최대 100개까지 유지 가능하다.
주의사항
JPA 설정 시
SSE 통신을 사용하는 동안 HTTP Connection이 계속 열려있으므로 응답 API쪽에서 JAP를 사용하고 open-in-view 속성이 true로 되어있다면, DB Connection도 같이 열려있게되니 반드시 해당 속성을 false로 해야한다
Nginx 리버스 프록시
위 설정을 사용한다면 아래와 같은 세팅이 적용된다.
1. 기본적으로 Upstream 요청 시 HTTP/1.0 버전을 사용함
2. Nnginx에서 Connectiob: close 헤더를 사용함
이를 해결하기 위해 아래설정을 추가하자.
proxy_set_header Connection ''; proxy_http_version 1.1; |
그 외에도 proxy buffering 기능을 비활성화하여 SSE 응답에 대해 즉각적인 대응이 되도록 해야한다. 이 부분은 SSE 응답 API 쪽에서 헤더에 X-Accel-Buffering: no 를 붙여주면 SSE 응답만 버퍼링을 하지 않도록 할 수 있다고 한다. (자세한 설명은 참고 링크를 확인)
Scale out 시 문제
SseEmitter 객체는 서버 메모리에 저장되어 있어 인스턴스를 늘리는 경우 커넥션을 해당 컨테이너에 유지시키는 방식을 써야한다. 해싱방식을 쓸 순 있지만, Redis 같은 메시지 브로커나 분산 캐시를 통해 문제를 해결할 수 있다고 한다.
구현
application.properties
로그 파일을 파일로 저장하고 롤링, 보관주기를 설정하자.
스프링 application 실행 위치에 myapp.log 로 생성되게 하였다.
logging.file.path=.
logging.file.name=myapp.log
logging.pattern.rolling-file-name=myapp-%d{yyyy-MM-dd}.%i.log
logging.file.max-history=2
logging.file.max-size=1MB
logging.file.total-size-cap=10MB
logging.file.clean-history-on-start=true
sse.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<input id="input"/>
<button id="send">로그파일 읽기</button>
<div id="messages"></div>
<script>
const eventSource = new EventSource(`/subscribe?id=${Math.random()}`);
eventSource.onopen = (e) => { console.log(e); };
eventSource.onerror = (e) => { console.log(e); };
eventSource.onmessage = (e) => {
document.querySelector("#messages").appendChild(document.createTextNode(e.data + "\n"));
};
document.querySelector("#send").addEventListener("click", () => {
fetch(`/publish?message=${document.querySelector("#input").value}`);
});
</script>
</body>
</html>
페이지가 로딩되면 바로 /subscribe?id=랜덤값을 보내어 파일쓰레드생성을 요청할 것이다..
Input 태그를 넣어두었는데 이부분을 활용하면 접속한 세션 전체에 메시지를 전달하도록 할 수 있다.
파일 읽기
파일을 읽어 sse 객체에 send 해준다.
@Slf4j
public class FileThread extends Thread{
private static final int DELAY_MILLIS = 1000;
private boolean isRun;
//대상 파일
private final File file;
private final Map<String, SseEmitter> sse;
public FileThread(File file, Map<String, SseEmitter> sse) {
this.file = file;
this.sse = sse;
}
@Override
public void run() {
isRun = true;
if (!file.exists()) {
System.out.println("Failed to find a file - " + file.getPath());
}
//try 문에서 Stream을 열면 블럭이 끝났을 때 close를 해줌
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))) {
while (isRun) {
//readLine(): 파일의 한 line을 읽어오는 메소드
final String le = br.readLine();
if (le != null) {
// System.out.println("New line added - " + le);
sse.forEach((id, emitter) -> {
try {
emitter.send(le, MediaType.TEXT_PLAIN);
} catch (Exception e) {
// deadIds.add(id);
log.info("disconnected id : " + id);
}
});
} else {
Thread.sleep(DELAY_MILLIS);
}
}
} catch (Exception e) {
System.out.println("Failed to tail a file - " + file.getPath());
}
System.out.println("Stop to tail a file - " + file.getName());
}
}
Sse Controller
GET /tails url 에 접근하면 위에서 생성한 sse.html 을 출력하고
GET /subscribe?id=랜덤값 을 통해 sse객체에 id 값을 맵핑한다.
GET /publish 는 파일쓰레드를 생성하여 지속적으로 클라이언트에게 메시지를 보내게 된다.
@Slf4j
@Controller
public class LogsSse {
private static final Map<String, SseEmitter> CLIENTS = new ConcurrentHashMap<>();
@GetMapping(value="/tails")
public String page() {
log.info("springboot application log tails -f page");
return "html/sse";
}
@GetMapping(value="/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(String id) throws IOException {
SseEmitter emitter = new SseEmitter();
CLIENTS.put(id, emitter);
emitter.onTimeout(() -> CLIENTS.remove(id));
emitter.onCompletion(() -> CLIENTS.remove(id));
emitter.send("서버와 연결되었습니다. ID(" + id + ")", MediaType.TEXT_PLAIN);
log.info("서버와 연결되었습니다. ID(" + id + ")");
return emitter;
}
@GetMapping(value="/publish")
public @ResponseBody HashMap<String, Object> publish(String message) {
HashMap<String,Object> map = new HashMap<String,Object>();
Set<String> deadIds = new HashSet<>();
CLIENTS.forEach((id, emitter) -> {
try {
// emitter.send(message, MediaType.TEXT_PLAIN);
log.info("전송ID : " + id + ", 내용 : " + message);
fileGet();
} catch (Exception e) {
deadIds.add(id);
log.info("disconnected id : " + id);
thread.interrupt();
}
});
map.put("status","success");
deadIds.forEach(CLIENTS::remove);
return map;
}
FileThread watcher = null;
Thread thread = null;
// 스프링부트 로그를 읽어오기
public void fileGet() {
if(thread==null) {
File file = new File("myapp.log");
watcher = new FileThread(file, CLIENTS);
thread = new Thread(watcher);
thread.setDaemon(true);
thread.start();
}
}
}
테스트
작성한 /tails 페이지에 접근해보자.
페이지접근 시 sse.html 의 eventSource를 통해 /subscribe 페이지에 랜덤한 id값을 보내어 sseEmitter 객체를 생성하게 되고, [로그파일 읽기] 버튼을 누르면 쓰레드를 생성해 file을 지속적으로 읽어 sseEmitter 객체에 들어온 클라이언트들에게 로그를 보낸다.
여러 브라우저를 동시에 켜서 확인할 수 있고, 접속한 랜덤한 ID 별로 데이터를 전달하게된다.
마치며
통신에 대한 부분은 항상 소켓만 생각했었고, 소켓으로만 구현해보았었는데,
찾아보니 역시 용도에 따라 라이브러리가 이미 존재했다..ㅋ
ocp 콘솔을 쓰면서 항상 logs 메뉴를 자주 눌렀었는데 이번 기회를 통해 재밌는 기능을 구현해봐서 재밌었다.
참고블로그
https://jsonobject.tistory.com/558
https://tecoble.techcourse.co.kr/post/2022-10-11-server-sent-events/