Sandbox - Docs - Twitter - YouTube
Memphis{dev} is an open-source real-time data processing platform
that provides end-to-end support for in-app streaming use cases using Memphis distributed message broker.
Memphis' platform requires zero ops, enables rapid development, extreme cost reduction,
eliminates coding barriers, and saves a great amount of dev time for data-oriented developers and data engineers.
$ pip3 install memphis-py
from memphis import Memphis, Headers
from memphis import retention_types, storage_types
First, we need to create Memphis object
and then connect with Memphis by using memphis.connect
.
async def main():
try:
memphis = Memphis()
await memphis.connect(
host="<memphis-host>",
username="<application-type username>",
connection_token="<broker-token>",
port="<port>", # defaults to 6666
reconnect=True, # defaults to True
max_reconnect=3, # defaults to 3
reconnect_interval_ms=1500, # defaults to 1500
timeout_ms=1500, # defaults to 1500
# for TLS connection:
key_file='<key-client.pem>',
cert_file='<cert-client.pem>',
ca_file='<rootCA.pem>'
)
...
except Exception as e:
print(e)
finally:
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
Once connected, the entire functionalities offered by Memphis are available.
To disconnect from Memphis, call close()
on the memphis object.
await memphis.close()
If a station already exists nothing happens, the new configuration will not be applied
station = memphis.station(
name="<station-name>",
schema_name="<schema-name>",
retention_type=retention_types.MAX_MESSAGE_AGE_SECONDS, # MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES. Defaults to MAX_MESSAGE_AGE_SECONDS
retention_value=604800, # defaults to 604800
storage_type=storage_types.DISK, # storage_types.DISK/storage_types.MEMORY. Defaults to DISK
replicas=1, # defaults to 1
idempotency_window_ms=120000, # defaults to 2 minutes
send_poison_msg_to_dls=True, # defaults to true
send_schema_failed_msg_to_dls=True # defaults to true
)
Memphis currently supports the following types of retention:
memphis.retention_types.MAX_MESSAGE_AGE_SECONDS
Means that every message persists for the value set in retention value field (in seconds)
memphis.retention_types.MESSAGES
Means that after max amount of saved messages (set in retention value), the oldest messages will be deleted
memphis.retention_types.BYTES
Means that after max amount of saved bytes (set in retention value), the oldest messages will be deleted
Memphis currently supports the following types of messages storage:
memphis.storage_types.DISK
Means that messages persist on disk
memphis.storage_types.MEMORY
Means that messages persist on the main memory
Destroying a station will remove all its resources (producers/consumers)
station.destroy()
await memphis.attach_schema("<schema-name>", "<station-name>")
await memphis.detach_schema("<station-name>")
The most common client operations are produce
to send messages and consume
to
receive messages.
Messages are published to a station and consumed from it by creating a consumer. Consumers are pull based and consume all the messages in a station unless you are using a consumers group, in this case messages are spread across all members in this group.
Memphis messages are payload agnostic. Payloads are bytearray
.
In order to stop getting messages, you have to call consumer.destroy()
. Destroy will terminate regardless
of whether there are messages in flight for the client.
producer = await memphis.producer(station_name="<station-name>", producer_name="<producer-name>", generate_random_suffix=False)
await prod.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
ack_wait_sec=15) # defaults to 15
headers= Headers()
headers.add("key", "value")
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
headers=headers) # default to {}
Meaning your application won't wait for broker acknowledgement - use only in case you are tolerant for data loss
await producer.produce(
message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema)
headers={}, async_produce=True)
Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id
await producer.produce(
message='bytearray/protobuf class/dict', # bytes / protobuf class (schema validated station - protobuf) or bytes/dict (schema validated station - json schema)
headers={},
async_produce=True,
msg_id="123")
producer.destroy()
consumer = await memphis.consumer(
station_name="<station-name>",
consumer_name="<consumer-name>",
consumer_group="<group-name>", # defaults to the consumer name
pull_interval_ms=1000, # defaults to 1000
batch_size=10, # defaults to 10
batch_max_time_to_wait_ms=5000, # defaults to 5000
max_ack_time_ms=30000, # defaults to 30000
max_msg_deliveries=10, # defaults to 10
generate_random_suffix=False
start_consume_from_sequence=1 # start consuming from a specific sequence. defaults to 1
last_messages=-1 # consume the last N messages, defaults to -1 (all messages in the station)
)
Once all the messages in the station were consumed the msg_handler will receive error: Memphis: TimeoutError
.
async def msg_handler(msgs, error):
for msg in msgs:
print("message: ", msg.get_data())
await msg.ack()
if error:
print(error)
consumer.consume(msg_handler)
Acknowledge a message indicates the Memphis server to not re-send the same message again to the same consumer / consumers group
await message.ack()
Get headers per message
``python headers = message.get_headers()
### Get message sequence number
Get message sequence number
```python
sequence_number = msg.get_sequence_number()
consumer.destroy()