Skip to content

dormidon/kafka-example

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

kafka-example

Настройка окружения

Запуск всего

docker-compose -f docker/all.yml up

Остановка всего

docker-compose -f docker/all.yml down

Kafka - золотая корова проекта. Доступна на сети хоста на порту 9092. При старте контейнера создается два топика:

  • messages - содержит события отправки сообщений
  • read - содержит события прочтения пользователями сообщений (для сброса кэша прочитанных сообщений)

Наблюдать за состояниями топиков можно через kafka-console-consumer.
За messages:

docker run --rm --network host wurstmeister/kafka:0.10.2.0 kafka-console-consumer.sh --topic messages --from-beginning --bootstrap-server localhost:9092

За read:

docker run --rm --network host wurstmeister/kafka:0.10.2.0 kafka-console-consumer.sh --topic read --from-beginning --bootstrap-server localhost:9092

UI кластера Kafka. После запуска доступен по http://localhost:9000. Для добавления в него поднятого рядом нашего "кластера" Kafka делаем следующее. В меню Cluster -> Add Cluster в поле Cluster Name указываем произвольное имя кластера, а в поле Cluster Zookeeper Host передаем zookeeper:2181. Версия кластера не столь важна. После добавления на главной странице в списке кластеров появляется наш кластер, и мы можем в зайти посмотреть на него.

UI СУБД, в нашем случае удобно через него смотреть на состояние инстанса PostgreSQL. После запуска доступен по http://localhost:8081/. Для добавления в него поднятого рядом нашего инстанса PostgreSQL заполняем поля следующим образом:

  • System = PostgreSQL
  • Server

В консоли выполняем

docker network inspect docker_default

В выхлопе находим раздел, соответсвующий контейнеру postgres. Например, он может выглядеть так

"830b8d68eb90499946336da7301ef51fe005bfdee7985255c1f982840194074e": {
 "Name": "postgres",
 "EndpointID": "6535e8cc305ca0a89bb743ed81697ede3179c9959d9ef476171e89da0cc7270d",
 "MacAddress": "02:42:c0:a8:30:03",
 "IPv4Address": "192.168.48.3/20",
 "IPv6Address": ""
}

Берем адрес из значение поля IPv4Address, в примере это 192.168.48.3, и указываем его в качестве сервера.

  • Username = channel
  • Password = password
  • Database = channel

Работа с приложением

Приложение представляет из себя append-only "канал", куда различные пользователи могут отправлять сообщения.

Модель данных

Каждое сообщение состоит из:

  • целочисленного "уникального" монотонно возрастающего идентификатора
  • пользователя-автора сообщения
  • времени публикации
  • произвольного текста

API

Приложение имеет REST API, Swagger-интерфейс которого находится по адресу http://localhost:8081/swagger-ui.html.

Доступные ресурсы и методы:

  • /messages
    • GET - получения указанного числа сообщений в канале начиная с заданного сообщения
    • POST - отправка сообщения в канал
  • /messages/search
    • GET - полнотекстовый поиск сообщений в канале
  • /messages/unread
    • GET - для заданного пользователя возвращает true/false - есть ли у него новые (непрочитанные) сообщения
    • DELETE - "удаляет" "непрочитанность" сообщений до заданного сообщения (тем самым пользователь отмечает, что прочитал сообщения до заданного)

Во все ресурсы идентификатор пользователя передается через заголовок x-user. Для простоты демо пользователи имеют строковый идентификатор, который по-совместительству служит и его отображаемым именем.

Внутреннее устройсто

Реализация содержит следующие компоненты:

  • StorageService поверх PosgtreSQL как основное хранилище сообщений
  • SearchService на основе Elastic для полнотекстового поиска сообщений
  • ReadCache с Redis под капотом для хранения ID сообщения в канале и ID последнего прочитанного для каждого пользователя - сравнивая эти два идентификатора можно понять, есть ли у заданного пользователя непрочитанные сообщения в канале

Доступные две основные реализации ChannelSerivce, на которых происходит иллюстрация различных подходов.

  • DualWriteChannelService - "классический" подход. При отправке сообщения параллельно модифицирует данные во всех подсистемах - сохраняет в основную БД, отправляет на индексацию в Elastic и отмечает изменения в кэше. Такой подход имеет следующие подводные камни.

    • Гонки. Например, если примерно в одно и то же время отправилось два сообщения, в кэш будет выполнено два запроса на модификацию идентификатора последнего сообщения в канале, в результате чего в кэше может "осесть" ID не последнего сообщения.
    • Вечная неконсистеность между компонентами системы ("perpetual inconsistency" как антоним "eventual consistency"). Например, если в момент отправки сообщения был недоступен Elastic, то мы не проиндексируем текст этого сообщения, в итоге мы не сможем его найти при полнотекстовом поиске. Даже после восстановления доступности Elastic это состояние само не "рассосется" и без принятия специальных шагов останется таким навсегда. И чем дольше работает такая система, тем больше накапливается несоответствия данных между ее частями.
  • LogWriteChannelService - все события (факты) модификации данных в системе (отправка сообщения и отметка пользователем прочитанности) сначала попадают в лог (в роли которого выступает Kafka). События неизменямые, всегда пишутся только в конец лога (append only). К этому логу независимо "подсасываются" читатели, каждый из которых транслирует изменения в свой компонент системы:

    • StorageConsumer - читает события отправки сообщений и сохраняет их в PostgreSQL
    • IndexConsumer - индексирует отправляемые сообщения в Elastic
    • CacheConsumer - сохраняет ID последнего сообщения в Redis
    • ReadEventConsumer - сохраняет ID последнего прочитанного пользователем сообщения в Redis

    Подход имеет следующие сильные стороны.

    • Отсутствие гонок. Записи "упорядочиваются" логом и гонки не возможны в принципе. Лог "индуцирует" порядок событий, который становится единым для всех потребителей.
    • Автоматическое восстановление при сбоях. Даже если какая-то часть подсистем становится не доступной, временно останавлиется обновление данных в них. Но после восстановления доступности сооветствующие читатели потребляют события из лога с того места, до которого они дошли до сбоя, автоматически приводя систему к целостному состоянию. Мы имеем только "eventual consistency", но не "perpetual inconsistency" без каких-либо дополнительных инструментов.
    • Слабая связность. Читатели лога ничего не знают друг о друга, а уж тем более писатель в лог ничего не знает о читателях. Читатели могут добавляться в разное время независимыми командами без какой-либо модификации исходного приложения. Мы получаем "loosely coupled system" со всеми ее достоинствами.
    • Устойчивость к багам. Если вдруг мы поняли, что некорректно индексируем данные из-за ошибки в коде, то в любой момент можем исправить эту ошибку, после чего перечитать лог, построить новый корректный индекс и переключиться на него.

Запуск приложения

Запускать приложение можно из командной строки.

Аргумент channelService определяет, какая реализация ChannelService будет использована:

  • dual-write для DualWriteChannelService
    ./gradlew run --args='--channelService=dual-write'
    
  • log-write для LogWriteChannelService
    ./gradlew run --args='--channelService=log-write'
    

После запуска приложения можно пойти в интерактивный UI API http://localhost:8080/swagger-ui.html, начать совершать какие-либо операции, попутно отключая различные компоненты системы, и смотреть, как ведет себя та или иная реализация ChannelService.

Для отключения/включения основных хранилищ можно делать:

docker {stop|start} {posgres|kafka|redis|elastic}

Стоит отметить, что при поднятии окружения в Docker никакие персистентные volume-ы не подключаются. Это значит, что если мы будем делать docker {stop|start} контейнера, то записанные данные мы не потеряем. А вот если мы сделаем docker-compose -f docker/all.yml down или docker rm на конкретный контейнер, записанные данные будут потеряны.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published