Post

Java ConcurrentSkipListMap

1. 스트림 정렬 로직

여러 스레드에서 지속적으로 발생하는 이벤트 스트림이 있다고 가정한다. 마지막 60초의 이벤트와 60초보다 오래된 이벤트를 가져올 수 있어야 한다.

먼저 이벤트 데이터의 구조를 정의한다.

1
2
3
4
5
6
public class Event {
    private ZonedDateTime eventTime;
    private String content;

    // standard constructors/getters
}

eventTime 필드를 사용하여 이벤트를 정렬된 상태로 유지하려고 한다. ConcurrentSkipListMap을 사용하여 이를 달성하려면 인스턴스를 생성하는 동안 Comparator를 생성자에 전달해야 한다.

1
2
ConcurrentSkipListMap<ZonedDateTime, String> events = new ConcurrentSkipListMap<>(
    Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));

타임스탬프를 사용하여 도착한 모든 이벤트를 비교한다. compareLong() 메서드를 사용하고 있으며 ZonedDateTime에서 긴 타임스탬프를 취할 수 있는 추출 함수를 전달하고 있다.

이벤트가 도착하면 put() 메서드를 사용하여 맵에 이벤트를 추가하기만 하면 된다. 이 방법에는 명시적인 동기화가 필요하지 않다.

1
2
3
public void acceptEvent(Event event) {
    events.put(event.getEventTime(), event.getContent());
}

ConcurrentSkipListMap은 생성자에서 전달된 Comparator를 사용하여 아래에 있는 해당 이벤트의 정렬을 처리한다.

ConcurrentSkipListMap의 가장 눈에 띄는 장점은 잠금 없는 방식으로 데이터의 변경 불가능한 스냅샷을 만들 수 있는 방법이다. 지난 1분 이내에 도착한 모든 이벤트를 가져오려면 tailMap() 메서드를 사용하고 요소를 가져오려는 시간을 전달할 수 있다.

1
2
3
public ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
    return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}

지난 1분 동안의 모든 이벤트를 반환한다. 그것은 변경 불가능한 스냅샷이 될 것이며 가장 중요한 것은 다른 쓰기 스레드가 명시적 잠금을 수행할 필요 없이 ConcurrentSkipListMap에 새 이벤트를 추가할 수 있다는 것이다.

이제 headMap() 메서드를 사용하여 지금부터 1분 후에 도착한 모든 이벤트를 가져올 수 있다.

1
2
3
public ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
    return events.headMap(ZonedDateTime.now().minusMinutes(1));
}

그러면 1분보다 오래된 모든 이벤트의 변경 불가능한 스냅샷이 반환된다.

2. 정렬 스트림 로직 테스트

ConcurrentSkipListMap을 사용하여 정렬 논리를 구현한 후에는 각각 100개의 이벤트를 보내는 두 개의 작성기 스레드를 만들어 테스트 할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;

Runnable producer = () -> IntStream
  .rangeClosed(0, 100)
  .forEach(index -> eventWindowSort.acceptEvent(
      new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
  );

for (int i = 0; i < numberOfThreads; i++) {
    executorService.execute(producer);
}

각 스레드는 acceptEvent() 메서드를 호출하여 지금부터 “현재 마이너스 100초”까지 eventTime이 있는 이벤트를 전송한다.

그동안 1분 기간 내에 있는 이벤트의 스냅샷을 반환하는 getEventsFromLastMinute() 메서드를 호출할 수 있다.

1
2
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute 
  = eventWindowSort.getEventsFromLastMinute();

eventsFromLastMinute의 이벤트 수는 생산자 스레드가 이벤트를 EventWindowSort로 보내는 속도에 따라 각 테스트 실행에서 달라진다. 반환된 스냅샷에 1분보다 오래된 단일 이벤트가 없다고 주장할 수 있다.

1
2
3
4
5
6
7
long eventsOlderThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertEquals(eventsOlderThanOneMinute, 0);

그리고 스냅샷에는 1분 기간 내에 있는 이벤트가 0개 이상 있다.

1
2
3
4
5
6
7
long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertTrue(eventYoungerThanOneMinute > 0);

getEventsFromLastMinute()는 아래의 tailMap()을 사용한다.

ConcurrentSkipListMap에서 headMap() 메서드를 사용하는 getEventsOlderThatOneMinute()를 테스트 한다.

1
2
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute 
  = eventWindowSort.getEventsOlderThatOneMinute();

이번에는 1분보다 오래된 이벤트의 스냅샷을 얻는다. 이러한 이벤트가 0개 이상 있다고 주장할 수 있다.

1
2
3
4
5
6
7
long eventsOlderThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertTrue(eventsOlderThanOneMinute > 0);

다음으로 마지막 순간에 발생하는 단일 이벤트가 없다는 것이다.

1
2
3
4
5
6
7
long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertEquals(eventYoungerThanOneMinute, 0);

가장 중요한 점은 다른 스레드가 여전히 ConcurrentSkipListMap에 새 값을 추가하는 동안 데이터의 스냅샷을 찍을 수 있다는 것이다.

[출처 및 참고]

This post is licensed under CC BY 4.0 by the author.