HyperStorage использует cassandra для оперативного хранения и kafka для публикации событий об изменениях данных. Основные возможности HyperStorage:
- Сохранение и чтение документов в хранилище;
- Работа с коллекциями;
- Поддержка публикации событий при изменениях данных;
- поддержка GET,PUT,DELETE,PATCH;
- POST для коллекций.
Интерфейс сервиса описан в файле hyperstorage.raml
Для того, чтобы работать с HyperStorage используя типизированые классы, нужно подключить RAML файл содержащий спецификацию hyper-storage используя hyperbus плагин. Для этого, в projects/plugin.sbt
добавить:
addSbtPlugin("eu.inn" % "hyperbus-sbt-plugin" % "0.1.82")
resolvers ++= Seq("Innova plugins" at "http://repproxy.srv.inn.ru/artifactory/plugins-release-local")
И в build.sbt проекта добавить:
ramlHyperbusSource := file("hyperstorage.raml")
ramlHyperbusPackageName := "com.hypertino.hyperstorage.api"
В качестве RAML файла может быть общий с другими сервисами файл, но он должен включать спецификацию для HyperStorage.
HyperStorage поддерживает работу с контентом в хранилище через URI: /hyper-storage/content/{path:*}
path:* в hyperbus позволяет работать с путем, который содержит множество сегментов. Т.е. path может быть равен abc/xyz/kkk
Для документов поддерживается операции GET, PUT, PATCH и DELETE. В тело документа можно положить любой JSON объект, его схема не ограничена чем-либо.
Важно, чтобы в URI/path документов, отсутсвовала тильда ~ в конце последнего/предпоследнего сегмента, это признак коллекции.
В этом примере посылается типизированый запрос HyperStorageContentPut
для вставки документа.
hyperbus <~ HyperStorageContentPut("abc/123", DynamicBody(Obj.from("a" → 10, "x" → "hello"))) map {
case Ok(body) ⇒ println("abc/123 is updated")
case Created(body) ⇒ println("abc/123 is created")
} recover {
case hyperbusError: ErrorResponse ⇒ println(hyperbusError.body.code)
case otherException ⇒ println("something wrong")
}
В теле посылается Obj
из библиотеки binders. В примере выше он будет сериализован в JSON объект {"a":10, "x":"hello"}
В случае отсутствия объекта по указанному пути, он создается. Если уже существует, заменяется целиком.
Важно понимать, что при сохранении документов в HyperStorage поля объекта JSON с значением null удаляются.
hyperbus <~ HyperStorageContentGet("abc/123") map {
case Ok(body, _) ⇒ println("abc/123 is fetched:", body.content)
} recover {
case NotFound(body) ⇒ println("abc/123 is not found")
case hyperbusError: ErrorResponse ⇒ println(hyperbusError.body.code)
case otherException ⇒ println("something wrong")
}
В body.content содержится тот-же самый объект Obj
/Value
из binders.
hyperbus <~ HyperStorageContentDelete("abc/123") map {
case Ok(body) ⇒ println("abc/123 is deleted.")
} recover {
case NotFound(body) ⇒ println("abc/123 is not found")
case hyperbusError: ErrorResponse ⇒ println(hyperbusError.body.code)
case otherException ⇒ println("something wrong")
}
Все выглядит точно также, как и для PUT. Разница в том, что нужно посылать HyperStorageContentPatch
. При этом:
- PATCH можно применить только к уже существующему документу;
- результатом выполнение PATCH является объединение полей уже существующего с PATCH телом из запроса;
- для удаления полей в уже существующем документе, нужно указать их в теле запроса с значением null
- При сохранении документов в HyperStorage поля объекта с значением null удаляются
У каждого документа имеется номер ревизии и HyperStorage гарантирует, что он будет последовательно (без пропусков) инкрементиться при изменении документа. Этот номер ревизии возвращается для GET запроса в заголовке revision
из hyperbus.
При изменении документов, для каждого из запросов PUT,PATCH,DELETE публикуется событие с тем-же самым URI в hyperbus. При этом метод меняется на FEED:PUT, FEED:PATCH и FEED:DELETE соответственно. В теле события содержится именно то тело запроса, которое привело к изменению ресурса. Также событие содержит заголовок revision
соответствующий состоянию документа. Таким образом реализована поддержка надежных фидов для HyperFacade.
В целом к коллекциям применимо все, что написано про документы. Каждая коллекция это документ со своим revision
+ свои механизмы для изменения и выборки его элементов, приспособленные к большому разммеру коллекции.
Также для коллекций поддерживаются индексы. Для работы с коллекциями используются все те-же самые объекты HyperStorageContentGet
, HyperStorageContentPut
, etc, что и с документами.
Коллекция идентифицируется тильдой в конце сегмента, т.е. вот это: abc~
, abc/xyz~
— коллекции, а вот это: abc
, abc/~xyz
— документы. Если нужно, чтобы URI документа содержал тильду, значит его нужно кодировать каким-либо образом.
Важно понимать, что поскольку коллекция обрабатывается на одной ноде HyperStorage и хранится в одной строке (ROW) в Cassandra, которая идентифицируется первичным ключем. Т.е. она не может масштабироваться и к ней применимы ограничения для ROW Cassandra.
Каждый элемент коллекции содержит идентификатор, который указывается в пути к нему.
В случае, если путь коллекции равен abc~
, то его элементы будут иметь пути: abc~/1
, abc~/2
, etc. При этом в теле элемента коллекции также всегда содержится поле id
с значением этого идентификатора. Оно всегда заполняется на стороне HyperStorage, в соответствии с путем элемента.
Для коллекции поддерживается вставка с использованием POST запроса, который генерирует новый уникальный идентификатор для элемента коллекции и вставляет его в коллекцию. При этом, будет опубликовано событие FEED:PUT (не POST) содержащее необходимое тело. Уникальные идентификаторы, генерируемые в HyperStorage гарантированно, в рамках одной коллекции возрастают.
При запросе содержимого коллекции через GET, по умолчанию возвращается первые N (100 — настраивается в HyperStorage) записей коллекции. Для того, чтобы запросить больше элементов или сделать постраничный вывод нужно использовать три параметра Query запроса GET:
size
— количество запрашиваемых элементов;filter
— выражение фильтра, в видеid > '10'
;sort
— выражение сортировки, в виде+a,-b
, что означает сортировать сначала по a, потом по b (descending). + можно опускать, т.е.sort=+a
эквивалентенsort=a
.
Элементы по умолчанию отсортированы в порядке возрастания id
.
Пример запроса в hyperbus, с указанием этих полей:
hyperbus <~ HyperStorageContentGet("collection-1~",
body = new QueryBuilder() sortBy Seq(SortBy("id")) add("size", 50) add("filter", "b > 10") result()
)
Если-же запрос идет из фасада, то это будет соответственно:
/hyper-storage/content/collection-1~?size=50&sort=id&filter=b%3E10
Элементы из коллекции возвращаются в виде массива, пример в JSON:
[
{
"appId": "1",
"name": "RF Online",
"id": "1",
},
{
"appId": "1006",
"name": "Lineage2 EU",
"id": "1006"
}
]
При отсутствии индексов на коллекции, она может быть эффективно отсортирована только по id
. Также по id
можно эффективно применить фильтр на равенство или диапазон. Учитывая это, можно построить постраничный вывод следующим образом:
- Для первой страницы запрашиваем
collection~?size=50
- для последующих страниц запрашиваем
collection~?size=50&id>"abc"
, где"abc"
— значениеid
последнего элемента предыдущей страницы.
Такой подход к постраничному выводу наиболее точно ложится на то, как коллекция хранится в cassandra.
В случае, если колллекция большая и нам нужны запросы к ней, которые фильтруют и/или сортируют не по значению id
, то можно создать для нее индекс.
Пример создания индекса запросом через hyperbus:
hyperbus <~ HyperStorageIndexPost("abc~", HyperStorageIndexNew(
indexId = Some("index1"), // название
sortBy = Seq(HyperStorageIndexSortItem("b", order = Some("asc"), fieldType = Some("decimal"))), // сортировка
filter = Some("b > 10")) // выражение фильтрации
)
в случае, если запрос отправляется через фасад (и это разрешено), то:
POST /hyper-storage/indexes/abc~
Content-Type: application/vnd.hyper-storage-index-new+json
{
"indexId":"index1",
"sortBy": [{"fieldName":"b","order":"asc","fieldType":"decimal"}],
"filter": "b > 10"
}
TODO: path:* не актуален, изменить доку
Важно, что в случае создания индекса, путь к коллекции указан в hyperbus как {path}, а не {path:*} и это означает, что если в пути есть более одного сегмента, то его нужно кодировать. Т.е. индекс для ресурса
abc/xyz~
создается с путемabc%2Fxyz
Используемые поля запроса имеют следующий смысл:
indexId
— уникально идентифицирует индекс в рамках коллекции, если с таким идентификатором индекс уже существует, при выполнении запроса вернется ошибка. Можно опустить значение и тогда будет индентификатор будет сгенерирован системой;sortBy
— сортировка индекса, необходимо указать поля, способ сортировки (текстовый или численный) и направление сортировки (по возрастанию или убыванию); ЕслиsortBy
не указан, то будет использоваться сортировка поid
элемента.filter
— выражение для фильтрации элементов, если опустить, то в индекс включаются все элементы.
Индекс содержит полную копию данных из основной таблицы, что может оказаться неприемлимым в случае если тело элемента большого размера
После запроса на создание индекса, HyperStorage начнет индексировать таблицу в фоновом режиме. До момента, пока индексация не завершится, при выполнении запросов индекс не используется. Все изменения которые вносятся в основную таблицу автоматически дублируются в индексную таблицу.
При каждом запросе, HyperStorage анализирует какой индекс использовать для выполнения запроса. При анализе используется поля sort
и filter
, то насколько они соответствуют индексу. Наибольший приоритет имеет индекс, который точно соответствует. Если такого нет, то может быть выбран индекс, который покрывает запрос, примеры:
- в запросе sort=
a,b
. А индекс построен поa
. - в запросе выражение
a > 10
, а в индексеa > 5
. Также работает когдаa > 10 and id > "abc"
, а индексa > 5
, и в других более сложных случаях, когда множество параметров выражения запроса входит в множество параметров выражения фильтра.
Для того, чтобы удовлетворить выражению, HyperStorage может выполнять несколько запросов в casandra и дополнительную фильтрацию и сортировку в соответствии с запросом. В худшем случае, который может быть если сортировка идет по полю, для которого нет индекса, HyperStorage попытается прочитать всю коллекцию, отсортировать и выдать согласно запросу.
В случае большой таблицы, сканирование будет прекращено при чтении более 10000
записей + размер запрошеной таблицы и HyperStorage вернет ошибку. Этим значением можно управлять указывая параметр skipMax
в запросе.
Также, выборка будет оборвана, если запрос приведет к более чем 20
запросам в cassandra.
Логика шардирования реализована в ShardProcessor.scala. При запуске ноды HyperStorage она, через Akka Cluster координирует с другими нодами HyperStorage состав и состояние нод HyperStorage. Для общения между собой ноды используют Akka Cluster напрямую (без Hyperbus). Используются следующие состояния:
Unknown
Passive
— нода подключилась и может посылать команды другим нодам, но не берет на себя работу;Activating
— нода в процессе активирования, разослала всем информацию и ждет подтверждения об этом;Active
— нода получила от всех подтверждение и активна;Deactivating
— нода в процессе деактивациии и завершает работу;
Задачи, которые посылают друг другу реализуют интерфейс ShardTask
и содержат строковый параметры key
, который используется для вычисления ноды, которая ответственна за эту задачу. Это вычисление выполняется используя consistent hashing, чтобы минимизировать перекидывания задач друг-другу при включении и выключении ноды.
Каждая нода, получив задачу, сначала проверяет является-ли она владельцем этой задачи. Владельцами задач могут быть только те ноды, которые находятся в состоянии Activating
или Active
. Если нода получила задачу, владельцем которой она не является, она ее форвардит владельцу.
Задача, пока ее кто-то придержал или форвардил другому может стать устаревшей ShardTask.isExpired
, в этом случае она выбрасывается.
В Akka Cluster, все ноды HyperStorage должны иметь роль
"hyperstorage"
(параметр akka.cluster.roles)
При активации нода ждет подтерждения от всех остальных в кластере, что ей можно начинать работу. При этом, поскольку она уже стала владельцем ноды, ей сразу начнут форвардить задачи. Эти задачи откладываются до тех пор, пока нода не получит подтверждения от всех.
При деактивации нода завершает те задачи, которые уже начаты и пересылает новому владельцу задачи, которые еще не начаты. Новый владелец задач (в статусе Active
или Activating
не начнет над ними работу, пока деактивируемая нода не покинет кластер.
Сам ShardProcessor
не выполняет задачи, а пересылает их дочерним акторам. При выполнении каждой задачи создается новый дочерний актор. Задачи могут выполняться акторами разного типа, поэтому ShardProcessor
поддерживает группы акторов используая параметр ShardTask.group
.
ShardProcessor
гарантирует, что если с некоторым key
и group
задача находится уже в работе у дочернего актора, то он будет ожидать и создаст новый актор только при завершении текущей работы.
Цель всего этого — гарантировать, что задачи идентифицируемые ключем будут выполняться только на конкретной ноде, конкретным актором. Это позволяет работать с Cassandra без использования if update
и paxos
и поддержать монотонно возрастающие ревизии.
На текущий момент имеются два вида исполнителей в HyperStorage:
PrimaryWorker
— основной обработчик, принимает запросы на изменение данных (PUT,PATCH,POST,DELETE).SecondaryWorker
— вторичный (фоновый) обработчик, который:- завершает выполнение транзакции по изменению данных, что включает в себя: индексацию, публикацию событий в Kafka (задачи
BackgroundContentTask
); - выполняет команды
IndexDefTask
по созданию и удалению индекса и делегирует контроль над этим вIndexManager
; - индексирует контент по запросу от
IndexManager
, задачиIndexContentTask
. Это происходит при создании индекса для уже существующих элементов коллекции;
- завершает выполнение транзакции по изменению данных, что включает в себя: индексацию, публикацию событий в Kafka (задачи
Описание полей можно найти в схеме данных. Описание таблиц и их назначений:
content
— здесь хранится содержимое документов и коллекций;transaction
— транзакции по изменению данных;checkpoint
— таблица используется фоновыми обработчиками, которые контроллируют, что все транзакции завершены успешно;index_def
— описание индекса;pending_index
— используется при индексации уже существующего контента, до стадии пока индекс не перейдет в стадию готов (2);index_content
,index_content_ta0
, etc — здесь хранится содержимое коллекций в соответствии с индексом (сортировка и фильтр). Название таблицы выбирается исходя из того, сколько и какого типа полей по которым идет сортировка, где:- первый символ,
t
— текстовое поле,d
— числовое; - второй символ
a
— сортировка по возрастанию,d
— сортировка по убыванию; - третий символ (число) обозначает номер поля.
- первый символ,
Важно, что transaction.partition
является частью PK для этой таблицы. Значение partition
вычисляется как CRC32 от document_uri
в UTF8 формате / MAX_PARTITIONS
, где константа MAX_PARTITIONS
равна 1024 на текущий момент и представляет собой ограничитель для масштабирования. Т.е. нет смысла запускать нод больше чем 1024 не меняя значение этой константы. При изменении константы MAX_PARTITIONS
партиции в таблице transaction
перестанут соответствовать содержимому, что приведет к незавершенным транзакциям без специальной обрабтки этой ситуации.
В обработчике PrimaryWorker
:
- Для любой операции по изменению данных, сначала идет вставка в таблицу
transaction
- Происходит применение изменений к таблице
content
При успешном выполнении обоих пунктов, транзакция считается принятой (но не завершенной). Если выполнится только первый пункт, транзакция не принята и операция по изменению вернет ошибку.
Поле transaction.uuid
помещается в начало списка в content.transaction_list
. content.transaction_list
содержит все транзакции, которые уже применены к content
, но еще не завершены до конца.
После выполнения 1 и 2, возвращаетс ответ об успешности операции и посылается команда на завершение транзакции в обработчик SecondaryWorker
(в фоновом режиме).
SecondaryWorker
индексирует элементы, публикует событие в кафку и как признак завершения транзакции устанавливает полу transaction.completed_at
. Если завершение не будет выполненно до конца по каким-то причинам, или нода упала, ответственность фоновых обработчиков Recovery workers
обнаружить этот факт и отправить команду SecondaryWorker
пока он не сможет завершить транзакцию успешно. При падениях могут повторно публиковаться события в Kafka, и повторно индексироваться то, что уже было проиндексировано.
HotRecoveryWorker
контролирует последние транзакции выбирая все транзакции в периоде -30min .. -2min
, что соответствует настройкам hyper-storage.hot-recovery
и hyper-storage.background-task-timeout
. Если в этом периоде обнаруживается транзакция, которая не выполнена, будет посылаться команда на завершение в SecondaryWorker
.
StaleRecoveryWorker
контролирует последние транзакции выбирая все транзакции в периоде checkpoint-1day ... -30min
, что соответствует настройкам hyper-storage.stale-recovery
. checkpoint
сохраняется отдельно для каждой партиции и продвигается вперед. Это позволит гарантировать выполнение транзакций не старше одного дня, например если они записывались в другом датацентре и кластер касандры их не синхронизировались к нодам с которыми работает револт. За пределами дня, который настраивается такого контроля не будет.
Все запросы на чтение принимаются из Hyperbus
и обрабатываются в HyperbusAdapter
выполняя запросы в Cassandra и не зависят от ShardProcessor
.
- Поддержка materialized view, в том числе:
- объединение документов в виртуальные коллекции;
- трансформация тела элементов и документов;
- шаблонные индексы которые можно применить к
users/{userId}/collection~
- каталог данных, индекс, который позволить браузить содержимое, также нужен для полноценной поддержки шаблонных индексов;
- explain, чтобы было видно, что индекс используется;
- гистограмма и статистика данных для индексов и основных таблиц;
- сохранение версий данных;
- нагрузочный тест и оптимизация производительности;
- кеширование данных и метаданных (информация об индексах);
- поддержать запросы напрямую к индексной таблице;
- поддержать запросы на чтение к таблице с транзакциями (информация о завершении транзакции)
/hyper-storage/transactions/?
; — тесты на отказ (+/- нода под нагрузкой, отказ касандры, отказ кафки, падение ноды); - для akka-cluster использовать protobuf: akka/akka#18371
Запуск из командной строки для теста
sbt '; set connectInput in run := true; set javaOptions in "hyperstorage" += "-Dconfig-files=../nodes-test/nodes.conf:../nodes-test/node-1.conf" ; hyperstorage/runMain com.hypertino.hyperstorage.EntryPoint'