Запуск всего
docker-compose -f docker/all.yml up
Остановка всего
docker-compose -f docker/all.yml down
Kafka - золотая корова проекта.
Доступна на сети хоста на порту 9092
.
При старте контейнера создается два топика:
messages
- содержит события отправки сообщенийread
- содержит события прочтения пользователями сообщений (для сброса кэша прочитанных сообщений)
Наблюдать за состояниями топиков можно через kafka-console-consumer
.
За messages
:
docker run --rm --network host wurstmeister/kafka:0.10.2.0 kafka-console-consumer.sh --topic messages --from-beginning --bootstrap-server localhost:9092
За read
:
docker run --rm --network host wurstmeister/kafka:0.10.2.0 kafka-console-consumer.sh --topic read --from-beginning --bootstrap-server localhost:9092
UI кластера Kafka.
После запуска доступен по http://localhost:9000.
Для добавления в него поднятого рядом нашего "кластера" Kafka делаем следующее.
В меню Cluster -> Add Cluster
в поле Cluster Name
указываем произвольное имя кластера,
а в поле Cluster Zookeeper Host
передаем zookeeper:2181
. Версия кластера не столь важна.
После добавления на главной странице в списке кластеров появляется наш
кластер, и мы можем в зайти посмотреть на него.
UI СУБД, в нашем случае удобно через него смотреть на состояние инстанса PostgreSQL. После запуска доступен по http://localhost:8081/. Для добавления в него поднятого рядом нашего инстанса PostgreSQL заполняем поля следующим образом:
System
=PostgreSQL
Server
В консоли выполняем
docker network inspect docker_default
В выхлопе находим раздел, соответсвующий контейнеру postgres
.
Например, он может выглядеть так
"830b8d68eb90499946336da7301ef51fe005bfdee7985255c1f982840194074e": {
"Name": "postgres",
"EndpointID": "6535e8cc305ca0a89bb743ed81697ede3179c9959d9ef476171e89da0cc7270d",
"MacAddress": "02:42:c0:a8:30:03",
"IPv4Address": "192.168.48.3/20",
"IPv6Address": ""
}
Берем адрес из значение поля IPv4Address
, в примере это 192.168.48.3
, и указываем его в качестве
сервера.
Username
=channel
Password
=password
Database
=channel
Приложение представляет из себя append-only "канал", куда различные пользователи могут отправлять сообщения.
Каждое сообщение состоит из:
- целочисленного "уникального" монотонно возрастающего идентификатора
- пользователя-автора сообщения
- времени публикации
- произвольного текста
Приложение имеет REST API, Swagger-интерфейс которого находится по адресу http://localhost:8081/swagger-ui.html.
Доступные ресурсы и методы:
/messages
GET
- получения указанного числа сообщений в канале начиная с заданного сообщенияPOST
- отправка сообщения в канал
/messages/search
GET
- полнотекстовый поиск сообщений в канале
/messages/unread
GET
- для заданного пользователя возвращаетtrue/false
- есть ли у него новые (непрочитанные) сообщенияDELETE
- "удаляет" "непрочитанность" сообщений до заданного сообщения (тем самым пользователь отмечает, что прочитал сообщения до заданного)
Во все ресурсы идентификатор пользователя передается через заголовок x-user
.
Для простоты демо пользователи имеют строковый идентификатор,
который по-совместительству служит и его отображаемым именем.
Реализация содержит следующие компоненты:
StorageService
поверхPosgtreSQL
как основное хранилище сообщенийSearchService
на основеElastic
для полнотекстового поиска сообщенийReadCache
сRedis
под капотом для хранения ID сообщения в канале и ID последнего прочитанного для каждого пользователя - сравнивая эти два идентификатора можно понять, есть ли у заданного пользователя непрочитанные сообщения в канале
Доступные две основные реализации ChannelSerivce
, на которых происходит иллюстрация различных подходов.
-
DualWriteChannelService
- "классический" подход. При отправке сообщения параллельно модифицирует данные во всех подсистемах - сохраняет в основную БД, отправляет на индексацию вElastic
и отмечает изменения в кэше. Такой подход имеет следующие подводные камни.- Гонки. Например, если примерно в одно и то же время отправилось два сообщения, в кэш будет выполнено два запроса на модификацию идентификатора последнего сообщения в канале, в результате чего в кэше может "осесть" ID не последнего сообщения.
- Вечная неконсистеность между компонентами системы ("perpetual inconsistency" как антоним "eventual consistency").
Например, если в момент отправки сообщения был недоступен
Elastic
, то мы не проиндексируем текст этого сообщения, в итоге мы не сможем его найти при полнотекстовом поиске. Даже после восстановления доступностиElastic
это состояние само не "рассосется" и без принятия специальных шагов останется таким навсегда. И чем дольше работает такая система, тем больше накапливается несоответствия данных между ее частями.
-
LogWriteChannelService
- все события (факты) модификации данных в системе (отправка сообщения и отметка пользователем прочитанности) сначала попадают в лог (в роли которого выступаетKafka
). События неизменямые, всегда пишутся только в конец лога (append only). К этому логу независимо "подсасываются" читатели, каждый из которых транслирует изменения в свой компонент системы:StorageConsumer
- читает события отправки сообщений и сохраняет их вPostgreSQL
IndexConsumer
- индексирует отправляемые сообщения вElastic
CacheConsumer
- сохраняет ID последнего сообщения вRedis
ReadEventConsumer
- сохраняет ID последнего прочитанного пользователем сообщения вRedis
Подход имеет следующие сильные стороны.
- Отсутствие гонок. Записи "упорядочиваются" логом и гонки не возможны в принципе. Лог "индуцирует" порядок событий, который становится единым для всех потребителей.
- Автоматическое восстановление при сбоях. Даже если какая-то часть подсистем становится не доступной, временно останавлиется обновление данных в них. Но после восстановления доступности сооветствующие читатели потребляют события из лога с того места, до которого они дошли до сбоя, автоматически приводя систему к целостному состоянию. Мы имеем только "eventual consistency", но не "perpetual inconsistency" без каких-либо дополнительных инструментов.
- Слабая связность. Читатели лога ничего не знают друг о друга, а уж тем более писатель в лог ничего не знает о читателях. Читатели могут добавляться в разное время независимыми командами без какой-либо модификации исходного приложения. Мы получаем "loosely coupled system" со всеми ее достоинствами.
- Устойчивость к багам. Если вдруг мы поняли, что некорректно индексируем данные из-за ошибки в коде, то в любой момент можем исправить эту ошибку, после чего перечитать лог, построить новый корректный индекс и переключиться на него.
Запускать приложение можно из командной строки.
Аргумент channelService
определяет, какая реализация ChannelService
будет использована:
dual-write
дляDualWriteChannelService
./gradlew run --args='--channelService=dual-write'
log-write
дляLogWriteChannelService
./gradlew run --args='--channelService=log-write'
После запуска приложения можно пойти в интерактивный UI API
http://localhost:8080/swagger-ui.html,
начать совершать какие-либо операции, попутно отключая различные компоненты системы, и смотреть, как ведет себя та или
иная реализация ChannelService
.
Для отключения/включения основных хранилищ можно делать:
docker {stop|start} {posgres|kafka|redis|elastic}
Стоит отметить, что при поднятии окружения в Docker никакие персистентные volume
-ы не подключаются.
Это значит, что если мы будем делать docker {stop|start}
контейнера, то записанные данные мы не потеряем.
А вот если мы сделаем docker-compose -f docker/all.yml down
или docker rm
на конкретный контейнер, записанные данные
будут потеряны.