Spring Server에서 Push 알림 제공하기 (SSE)
본 내용은 프로젝트 및 학습을 통해 알게된 내용을 정리한 것이라 실제 정확한 정보가 아닐 수 있음을 미리 알려드립니다.
개요
취업을 준비하는 과정에서 모임 및 커뮤니티 기능을 포함한 웹 SNS 서비스 프로젝트를 진행했다.
SNS 서비스는 다른 사용자간의 상호작용이 중요하다 판단해 PUSH 알림
이 반드시 필요하다 생각해 이 기능은 반드시 필요하다 생각하게 되었다.
SSAFY입과 초기에 클라이언트와 서버간 데이터 통신 방식을 잘 몰랐을 땐 AJAX는 신이야! 를 외쳤지만 WebSocket, SSE를 알아가며 상황에 맞게 구현해보고자 한다.
구현에 앞서 학습한 간략한 내용들
이벤트를 요청하는 다양한 방법
구분 | Polling | WebSocket | SSE |
---|---|---|---|
- 클라이언트가 서버에게 요청해서 데이터를 받아오는 방식 - 클라이언트는 서버의 상태를 알지 못하기 때문에 계속 호출해야한다. 이는 곳 대역폭 낭비로 이어진다. 요청하는 주기에 따라 short , long 등으로 구분할 수 있다. |
- TCP/IP를 이용한 전이중 방식(양방향)의 채용으로 클라이언트 <-> 서버 의 실시간 성을 보장해준다. |
HTTP를 활용한 단뱡향 통신 기술 클라이언트가 서버에 데이터를 요청하지 않아도 됨. 주기적으로 이벤트를 생성해 전송하는 방법 |
- 사실, SSE도 능동적인 형태의 '서버->클라이언트' 통신 방식은 아니다. 서버 푸시를 내기 위한 기술 중 1개일 뿐..
Pulling과 Polling의 차이
Pulling | Polling | |
---|---|---|
개념 | 데이터를 요청하는 방식으로 동작. 클라이언트가 필요한 데이터를 서버로 부터 직접 요청하는 방식 | 프로그램, 시스템에서 주기적으로 데이터나 상태를 확인하기 위해 지속적으로 조회하는 것. |
동작 | 클라이언트가 원하는 시점에 데이터 요청 -> 서버는 데이터 제공 |
요청을 보내고 응답을 기다리는 방식 + 주기적으로 확인함 |
예시 | 웹페이지 이동할 때 HTML, CSS, JS 등을 리소스를 받는 행위 | 새로고침을 통해 데이터 변경을 확인하는 것 |
Pull | Push | |
---|---|---|
아키텍처 | 클라이언트 -> 서버 | 서버 -> 클라이언트 |
SSE 방식을 채택한 이유?
결론 : Push알림은 실시간성이 다소 떨어져도 괜찮겠다 생각했기 때문.
- 이벤트가 발생했을 때 Push 알림을 보내는 것은 맞지만 즉각적인 알림일 필요는 없기 때문
이유 :
Push알림은 양방향 통신이 필요 없다.
- 유저A가 유저B에게 이벤트를 발생 시켰을 때, 유저B는 유저A가 수행한 이벤트를 받기만하면 되기 떄문이다.
뉴스피드 작성, 모임 생성, 댓글 작성 등은 채팅에 비해서 즉각적인 피드백을 원하지 않는 사용자 경험
네트워크 리소스 낭비를 줄일 수 있다.
- HTTP/1.1에
keepAlive
이란 개념이 추가되어 HTTP 연결을 일정 시간 유지할 수 있게 됨- keepAlive : 소켓 통신 같은 완전한 연결 유지, 상태유지, 양방향 통신과는 관련 없음
- HTTP연결을 재사용으로 인해 오버헤드가 적다.(stateful 상태가 됨)
- 별도의 프로토콜을 요구하는 것이 아닌 HTTP방식
✔️ 구현 해보자!
🏃🏻♂️ Push 알림 로직
- 유저가 로그인을 시도한다.
- 로그인이 성공적으로 이루어지면 SSE 커넥트를 위한 연결 요청을 1회 수행한다.
- 클라이언트 1명당 sseEmitter는 1개.
- 팔로우, 댓글 작성 등 클라이언트 간 이벤트가 발생하면 서버는 유저에서 이벤트를
전달한다.
👨🏻💻 서버 구현
- Controller : 클라이언트에게 Push알림을 보내기 위해 연결을 담당하는 영역
- Service : 서버에서 Push알림을 전송하기 위한 비즈니스 로직 담당하는 영역
- PushRepository : Push알림 로그를 관리
- EmitterRepository : 클라이언트 : Emitter = 1:1이기에 매핑 결과를 저장하는 영역
✅ Controller 살펴보기
@RequestMapping("/api/push")
@Slf4j
public class PushController {
private final PushService pushService;
// S-C-1. 클라이언트에게 Push연결
@GetMapping(value = "/connect/{userId}", produces = "text/event-stream")
@ResponseStatus(HttpStatus.OK)
public SseEmitter connect(@PathVariable Long userId,
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) throws IOException {
log.info("{}", lastEventId);
return pushService.connect(userId, lastEventId);
}
// S-C-2 : 이벤트 발생시 서비스를 Push 서비스를 발생시킨다.
@GetMapping(value = "/follow/{userId}", produces = "text/event-stream")
public void pushFollow(@PathVariable Long userId) {
log.info("왔나");
//=================
//비즈니스 로직을 수행한다.
//여기서는 유저A가 유저2를 팔로우하는 로직이라 가정
//=================
//비즈니스 로직 수행 이후 유저2에게 알림을 전송한다.
pushService.send(userId, PushType.FOLLOW, "유저 2님께서 회원님을 팔로우하기 시작했습니다.", "이동할 url");
}
}
S-C-1 : SSE를 이용해 클라이언트와 서버간 Push 알림을 제공
- /api/push/connect/{userId} 라는 엔드포인트로 접근 시, 클라이언트는 sseEmitter를 반환받는다. 이를 이용해 서버는 클라이언트에게 Push알림을 제공한다.
- SSE를 이용해 데이터를 주고 받을 수 있도록 응답 형태를
produces = "text/event-stream
으로 정의한다. - 헤더에
Last-Event-ID
: 이전에 받지 못한 이벤트가 존재하는 경우, 받은 마지막 이벤트 ID를 통해 미수신 목록을 받을 수 있게 하기 위해 포함한다.- 항상 전달 받는 정보는 아니끼 때문에
required=false
으로 선언했다. - 미수신 이벤트의 종료는 SSE 연결에 대한 시간 만료/종료와 같은 상황에서 이벤트가 발생하는 것을 말한다.
- 항상 전달 받는 정보는 아니끼 때문에
S-C-2 : 이벤트 발생시 서비스를 Push 서비스를 발생시킨다.
/follow/{userId}
라는 엔드포인트로 접근시 로직을 수행 후 Push 알림을 발생시킨다.
⚙️ Service 살펴보기
connect() : 연결(Push를 위한 구독)에 대한 비즈니스 로직을 담고 있다.
//타임 아웃 시간을 10분으로 한다.
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 10;
public SseEmitter connect(Long userId, String lastEventId) {
log.info("연결 : {}",lastEventId);
//새로운 Ssemitter를 만든 후, userId에 맞는 emitter 저장
String sseEmitterId = makeTimeIncludeId(userId);
SseEmitter sseEmitter = emitterRepository.save(sseEmitterId, new SseEmitter(DEFAULT_TIMEOUT));
//세션이 종료될 경우 저장한 SSEEmitter를 삭제한다.
//타임 아웃으로 새로운 연결을 요청할 때 새로운 emitter 객체를 생성하기 때문에 기존의 Emitter지운다.
sseEmitter.onCompletion(() -> emitterRepository.deleteById(sseEmitterId));
sseEmitter.onTimeout(() -> emitterRepository.deleteById(sseEmitterId));
// 503 에러가 발생하지 않도록 더미 데이터를 보내 연결을 유지한다.
String eventId = makeTimeIncludeId(userId); //개별 알림 이벤트를 식별하기 위한 값
sendEventToClient(sseEmitter, eventId, sseEmitterId, "EventStream Created. [userId= " + userId + "]");
//클라이언트가 미수신한 Event목록이 있을 경우 전송해 event 유실을 예방한다.
if (!lastEventId.isEmpty()) {
log.info("미수신 목록");
// 클라이언트가 마지막으로 수신한 이벤트 ID(last Event Id가 존재하면)
// 이전에 생성된 이벤트들을 가져와 클라이언트 에게 전송한다.
// 미수신 목록들 DB에 저장 -> 상태 고려하여 작성해야함
sendLostData(lastEventId, userId, sseEmitterId, sseEmitter);
// 미수신 목록들 전송했으므로 이벤트 캐시 삭제
emitterRepository.deleteAllEventCacheStartWithId(String.valueOf(userId));
}
return sseEmitter;
}
✔️구독
Spring에서 제공하는 sseEmitter 객체를 생성해서 저장한다.
필요할 때 마다 해당 유저가 생성한 sseEmitter를 불러와 이벤트 따른 Push를 전송한다
- 처음으로 호출할 때 더미 이벤트르 전송해 이벤트가 연결 되었음을 알려준다. 이때 더미 이벤트를 전송하지 않을 경우 503에러가 발생한다.
- 더미 이벤트로 연결이 완료되면 이전에 수신하지 못한 이벤트를 전송해 유실을 예방한다.
- sseEmitter를 반환해 클라이언트와 Stream을 유지하게 된다.
sseEmitterId | eventId |
---|---|
sseEmitter를 구분/관리하기 위한 식별자 ➡️ emitterRepository에서 관리해 유저별로 관리한다. |
개별 이벤트를 식별하기 위한 고유 값 ➡️ 각 이벤트는 고유한 Id를 가지고 있고, 클라이언트에게 전송될 때 이벤트 식별을 위해 사용된다. |
--- |
send() : 알림을 생성하고 지정된 수신자에게 알림을 전송하는 기능 수행
public void send(User sender, User receiver, PushType pushType, String content, String url) {
//Push 객체를 생성 및 저장
try{
Push push = pushRepository.save(createPush(sender, receiver, pushType, content, url));
Long receiverId = receiver.getUserId();
String eventId = makeTimeIncludeId(receiverId);
//로그인 한 유저의 모든 Emiiter를 불러온다
Map<String, SseEmitter> sseEmitters = emitterRepository.findAllEmitterStartWithByUserId(String.valueOf(receiverId));
sseEmitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, push.toDto());//데이터 캐시를 저장한다(유실된 데이터가 발생할 경우 처리하기 위함
sendEventToClient(emitter, eventId, key, push.toDto());//데이터를 receiver에게 전송
}
);
}catch (Exception e){
e.printStackTrace();
}
}
- 알림을 생성하고 수신자(receiver)에게 알림을 전송한다.
- 매개 변수로 받은 정보를 통해 push 객체를 생성한다.
- push 객체는 엔티티로써 DB에 저장한다.
- eventId를 생성한다. 이 Id는 sseEmitter로 전송되는 이벤트 고유식별자로 사용된다.
- Push 전송 후 연결이 유실 될 떄 last-Event-Id로 해당 값이 전달된다.
- receiver의 Id와 같은 모든 sseEmitter 객체를
findAllEmitterStartWithByUserId
메소드를 통해 받아온다. - sendEventToClient : 생성된 Push 객체를 통해 유저에게 알림을 전송한다.
sendToClient() : send()메소드에서 생성된 알림을 sseEmitter 객체를 사용해 클라이언트에게 전송
//SseEmitter 객체를 사용하여 SSE를 클라이언트에게 전송하는 역할
private void sendEventToClient(SseEmitter sseEmmitter, String eventId, String sseEmitterId, Object data) {
try {
sseEmmitter.send(SseEmitter.event()
.id(eventId) //이벤트 고유 식별자
.name("sse") //이벤트 이름 지정
.data(data)); //이벤트로 전송할 데이터 설정
} catch (IOException e) {
emitterRepository.deleteById(sseEmitterId);
sseEmmitter.completeWithError(e);
}
}
- eventId : 발생한 이벤트의 이름
- name : sseEvent의 이름이다. 추후 클라이언트에서 해당 부분을 통해 콜백함수를 정의할 수 있다.
- sseEventName이 sse 이기 때문에 클라이언트에서 처리할 이벤트를 다음과 같이 정의할 수 있다.
eventSource.addEventListener('sse', ()=>{이벤트 로직};
- sseEventName이 sse 이기 때문에 클라이언트에서 처리할 이벤트를 다음과 같이 정의할 수 있다.
sendLostData() : 유저ID를 통해 미수신된 이벤트들을 다시 발신한다.
private void sendLostData(String lastEventId, Long userId, String emitterId, SseEmitter emitter) {
//유저 ID에 해당하는 모든 SSE 이벤트를 가져온다.
Map<String, Object> eventCaches = emitterRepository.findAllEventCacheStartWithByUserId(String.valueOf(userId));
//클라이언트로 마지막에 보낸 eventId와 eventCache에 저장된 eventId를 비교한다.
//lastEventId 보다 크면 서버와 연결이 끊겼을 때 생성된 이벤트 이므로 클라이언트에게 보내준다.
eventCaches.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendEventToClient(emitter, entry.getKey(), emitterId, entry.getValue()));
}
makeTimeIncludeId() : id에 이벤트가 발생한 시간을 더해 유실된 이벤트를 찾을 수 있게 한다.
private String makeTimeIncludeId(Long userId) {
return userId + "_" + System.currentTimeMillis();
}
왜 구분자(_) 뒤에 시간을 붙인거야?
- userId만 사용해 데이터를 보내면 언제 보내졌는지 알 수 없기 때문이다.
시간을 통해 가장 마지막에 발생한 이벤트를 구분 할 수 있다.
예를 들어. 유저 ID는 1234이다. 연결을 요청했을 때 발생시간은 3000이고, 총 3개의 이벤트가 발생했다고 가정해보자,
이벤트1 : 유저 ID 9999가 팔로우, 발생시간 3050
이벤트2 : 유저 ID 2227이 채팅 메세지 전송, 발생 시간 2950
이벤트3 : 유저 ID 1129가 모임 생성, 발생시간 3020
이 상황에서 유실된 이벤트는 2번, 1과3은 연결 이후 발생한 이벤트이다. 유저1234가 서버에 연결을 요청했을 때sendLostData
메소드를 수행하게 된다.
현재 시간은 1000이므로 현재 이벤트 아이디는 1234_1000, 이벤트2는 1234_950이고, 이벤트 아이디 값의 순서에 의해 이벤트2는 유저1234에게 전송이 된다.
createPush() : push 알림 객체를 만든다.
private Push createPush(User sender, User receiver, PushType pushType, String content, String url) {
return Push.builder()
.receiver(receiver)
.sender(sender)
.pushType(pushType)
.content(content)
.url(url)//url에 대한 컨벤션 정의가 필요할 듯 -> 클릭시 컨트롤러로 이동해야하니까
.isRead(false)
.build();
}
- Builder 패턴을 적용해 push 알림 서비스를 구현했다.
- 만들어진 Push 엔티티는 send메세지에서 db에 저장되고, 클라이언트에게 전달된다.
💽 PushRepository 살펴보기
@Repository
public interface PushRepository extends JpaRepository<Push, Long> {
List<Push> findByReceiverUserId(Long receiverId);
}
- JPA를 이용해서 push알림에 대한 로그를 관리한다.
- 수신자 Id를 통해 유저의 알림로그를 확인할 수 있는 findByReceiverUserId메소드가 존재한다..
🔔 EmitterRepository 살펴보기
확장성을 위해 interface와 구현채를 구분해서 작성했다.
EmitterRepository인터페이스
/**
* sseEmitter를 이용해 알림을 실제 보내게 되는데, 어떤 회원에게 어떤 emitter가 연결 되어 있는지 저장해야함
* 어떤 이벤트들이 현재 발생했는지도 저장(추후 emitter 연결 끊길 경우 이어서 전송해줘야하기 때문)
* => EnitterRepository인 이유
* */
public interface EmitterRepository {
//Emitter 저장
SseEmitter save(String sseEmitterId, SseEmitter sseEmitter);
//Event 저장
void saveEventCache(String sseEmitterId, Object event);
//회원과 관련된 Emitter를 모두 찾음
Map<String, SseEmitter> findAllEmitterStartWithByUserId(String userId);
//회원과 관련되s Event 모두 찾음
Map<String, Object> findAllEventCacheStartWithByUserId(String userId);
//Emitter를 지움
void deleteById(String emitterId);
//회원과 관련된 모든 Emitter 제거
void deleteAllEmitterStartWithId(String userId);
//회원과 관련된 모든 이벤트 제거
void deleteAllEventCacheStartWithId(String userId);
}
EnitterRepositoryImpl : 인터페이스 구현체
package com.ssafy.backend.repository;
import com.ssafy.backend.controller.PushController;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Slf4j
@Repository
public class EmitterRepositoryImpl implements EmitterRepository{
//모든 emitter를 저장하는 ConcurrentHashMap
//ConCurrentHashMap을 사용함으로써 멀티쓰레드 환경에서도 동시성을 유지할 수 있게 한다.
private final ConcurrentHashMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();
//클라이언트 연결을 잃어도 이벤트 유실을 방지하기 위해 임시 저장
private final ConcurrentHashMap<String, Object> eventCache = new ConcurrentHashMap<>();
//Emitter 저장
@Override
public SseEmitter save(String sseEmitterId, SseEmitter sseEmitter){
//log.info("EmitterRepository 잡속, {}, {} ", sseEmitterId, sseEmitter);
log.info("emitter 저장 : {}", sseEmitterId);
emitters.put(sseEmitterId, sseEmitter);
return sseEmitter;
}
//Event 저장
@Override
public void saveEventCache(String eventCacheId, Object event) {
log.info("saveEventCache 접근 {} ", eventCacheId);
eventCache.put(eventCacheId, event);
}
//회원과 관련된 Emitter를 모두 찾음
@Override
public Map<String, SseEmitter> findAllEmitterStartWithByUserId(String userId) {
return emitters.entrySet().stream()
//sseEmitte의 key들은 userId_currentTime 형태여서 suffix가 userId인것만 찾게 한다.
.filter(entry -> entry.getKey().startsWith(userId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
//회원과 관련된 Event 모두 찾음
@Override
public Map<String, Object> findAllEventCacheStartWithByUserId(String userId) {
return eventCache.entrySet().stream()
//sseEmitte의 key들은 userId_currentTime 형태여서 suffix가 userId인것만 찾게 한다.
.filter(entry -> entry.getKey().startsWith(userId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
//주어진 ID 와 Emitter를 제거
@Override
public void deleteById(String sseEmitterId){
emitters.remove(sseEmitterId);
};
//회원과 관련된 모든 Emitter 제거
@Override
public void deleteAllEmitterStartWithId(String userId) {
emitters.forEach(
(key, emitter) -> {
//sseEmitte의 key들은 userId_currentTime 형태여서 suffix가 userId인것만 찾아 삭제한다.
if (key.startsWith(userId)) {
emitters.remove(key);
}
}
);
}
//회원과 관련된 모든 이벤트 제거
@Override
public void deleteAllEventCacheStartWithId(String userId) {
eventCache.forEach(
(key, emitter) -> {
//sseEmitte의 key들은 userId_currentTime 형태여서 suffix가 userId인것만 찾아 삭제한다.
if (key.startsWith(userId)) {
eventCache.remove(key);
}
}
);
}
}
클라이언트 구현
서버를 구현했으니 Push알림을 받아보자.
HTML 살펴보기
<button onclick="connectToServer()">서버 연결</button>
<button onclick="myClear()">clear</button>
<br />
<!-- C-3 -->
<button onclick="window.open('http://localhost:8080/api/push/follow/5','window_name','width=430,height=500,location=no,status=no,scrollbars=yes');">팔로우 알림 테스트</button>
<br />
<div id="pushList">
<h3>이벤트 결과 출력</h3>
</div>
function myClear() {
document.getElementById("pushList").innerText = "";
}
function connectToServer() {
//C-1 :
let userId = 5; // 사용자 아이디
let eventSource = new EventSource(`http://localhost:8080/api/push/connect/${userId}`, {
withCredentials: true,
});
//C-2 : 서버로 부터 오는 PushEvnet타입이 sse면 다음과 같은 동작을 수행한다.
eventSource.addEventListener("sse", (event) => {
console.log(event.lastEventId);
showResult(event.data);
});
}
//이벤트가 발생하면 이벤트 내용을 화면에 출력한다.
function showResult(notificationData) {
const p = document.createElement("div");
p.innerText = notificationData;
document.getElementById("pushList").appendChild(p);
}
C-1. Push알림을 수신받기 위한 EventSource 등록
- Push
C-2. Push 이벤트 타입에 따른 이벤트 처리
- PushService의
sendToclient
메소드를 보면 sseEmitter의 EventName을"sse"
라고 지정한 것을 볼 수 있다. - 서버에서 EventName을 다양하게 만들어 클라이언트에서 다양하게 이벤트를 처리도 할 수 있다.
- sseEmitter의 EnvetName을 지정해주지 않을 경우 default EventName은 'message' 이다.
C-3. 이벤트를 발생에 따른 Push 알림 수신
- 팔로우 알림 테스트 버튼클 클릭하면 서버에서 follow를 수행하고 Push 이벤트를 반환한다.
마무리
이렇게 SSE를 활용한 Push알림 기능에 대해 내용르 정리해 보았다.
Push EventName을 다양하게 하면 그만큼 이벤트를 처리할 수 있으니 좀 더 프로젝트가 끝나면 좀더 확장해서 서비스를 구현해봐야겠다.
또 FCM이라는 구글의 서비스 구글 폼 미쳤따이도 있으니 다음은 이것을 이용해 Push를 구현해보지 않을까..?
참고링크
개발자가 된 핏짜님 tistory
Hjjju님 velog
지단로보트님 velog
gilssang97님 tistory
airluca님 velog
우테코 4기 어썸오 정수현님 블로그