Skip to content

Commit 3acbd77

Browse files
committed
add put state batch
Signed-off-by: Fedor Partanskiy <[email protected]>
1 parent a356b32 commit 3acbd77

File tree

4 files changed

+184
-15
lines changed

4 files changed

+184
-15
lines changed

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,5 @@ require (
2020
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
2121
gopkg.in/yaml.v3 v3.0.1 // indirect
2222
)
23+
24+
replace github.com/hyperledger/fabric-protos-go-apiv2 => github.com/scientificideas/fabric-protos-go-apiv2 v0.0.0-20240819205904-84edd4adb1f8

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
33
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
44
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
55
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
6-
github.com/hyperledger/fabric-protos-go-apiv2 v0.3.3 h1:Xpd6fzG/KjAOHJsq7EQXY2l+qi/y8muxBaY7R6QWABk=
7-
github.com/hyperledger/fabric-protos-go-apiv2 v0.3.3/go.mod h1:2pq0ui6ZWA0cC8J+eCErgnMDCS1kPOEYVY+06ZAK0qE=
86
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
97
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
108
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@@ -13,6 +11,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
1311
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
1412
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1513
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
14+
github.com/scientificideas/fabric-protos-go-apiv2 v0.0.0-20240819205904-84edd4adb1f8 h1:7hFAzj8zZyoNGo1zWIB5rYpY1GFkxwxKblcFsW8zQWY=
15+
github.com/scientificideas/fabric-protos-go-apiv2 v0.0.0-20240819205904-84edd4adb1f8/go.mod h1:1x9TFdg5b2M9vema7s4EdQZ1qUV4peAtF2jQj8FjLjc=
1616
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
1717
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
1818
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=

shim/handler.go

+166
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ const (
1818
created state = "created" // start state
1919
established state = "established" // connection established
2020
ready state = "ready" // ready for requests
21+
22+
defaultMaxSizePutStateBatch = 100
23+
prefixMetaData = "m"
24+
prefixStateData = "s"
2125
)
2226

2327
// PeerChaincodeStream is the common stream interface for Peer - chaincode communication.
@@ -46,6 +50,11 @@ type Handler struct {
4650
cc Chaincode
4751
// state holds the current state of this handler.
4852
state state
53+
// if you can send the changes in batches.
54+
usePutStateBatch bool
55+
maxSizePutStateBatch uint32
56+
stateDataMutex sync.RWMutex
57+
stateData map[string]map[string]*peer.StateKV
4958

5059
// Multiple queries (and one transaction) with different txids can be executing in parallel for this chaincode
5160
// responseChannels is the channel on which responses are communicated by the shim to the chaincodeStub.
@@ -150,6 +159,7 @@ func newChaincodeHandler(peerChatStream PeerChaincodeStream, chaincode Chaincode
150159
cc: chaincode,
151160
responseChannels: map[string]chan *peer.ChaincodeMessage{},
152161
state: created,
162+
stateData: map[string]map[string]*peer.StateKV{},
153163
}
154164
}
155165

@@ -188,6 +198,11 @@ func (h *Handler) handleInit(msg *peer.ChaincodeMessage) (*peer.ChaincodeMessage
188198
return nil, fmt.Errorf("failed to marshal response: %s", err)
189199
}
190200

201+
err = h.sendBatch(msg.ChannelId, msg.Txid)
202+
if err != nil {
203+
return nil, fmt.Errorf("failed send batch: %s", err)
204+
}
205+
191206
return &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelID}, nil
192207
}
193208

@@ -214,6 +229,11 @@ func (h *Handler) handleTransaction(msg *peer.ChaincodeMessage) (*peer.Chaincode
214229
return nil, fmt.Errorf("failed to marshal response: %s", err)
215230
}
216231

232+
err = h.sendBatch(msg.ChannelId, msg.Txid)
233+
if err != nil {
234+
return nil, fmt.Errorf("failed send batch: %s", err)
235+
}
236+
217237
return &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_COMPLETED, Payload: resBytes, Txid: msg.Txid, ChaincodeEvent: stub.chaincodeEvent, ChannelId: stub.ChannelID}, nil
218238
}
219239

@@ -312,6 +332,17 @@ func (h *Handler) handleGetStateMetadata(collection string, key string, channelI
312332

313333
// handlePutState communicates with the peer to put state information into the ledger.
314334
func (h *Handler) handlePutState(collection string, key string, value []byte, channelID string, txid string) error {
335+
if h.usePutStateBatch {
336+
st := h.stateDataByID(channelID, txid)
337+
st[prefixStateData+collection+key] = &peer.StateKV{
338+
Key: key,
339+
Value: value,
340+
Collection: collection,
341+
Type: peer.StateKV_PUT_STATE,
342+
}
343+
return nil
344+
}
345+
315346
// Construct payload for PUT_STATE
316347
payloadBytes := marshalOrPanic(&peer.PutState{Collection: collection, Key: key, Value: value})
317348

@@ -340,6 +371,19 @@ func (h *Handler) handlePutState(collection string, key string, value []byte, ch
340371
func (h *Handler) handlePutStateMetadataEntry(collection string, key string, metakey string, metadata []byte, channelID string, txID string) error {
341372
// Construct payload for PUT_STATE_METADATA
342373
md := &peer.StateMetadata{Metakey: metakey, Value: metadata}
374+
375+
if h.usePutStateBatch {
376+
st := h.stateDataByID(channelID, txID)
377+
st[prefixMetaData+collection+key] = &peer.StateKV{
378+
Key: key,
379+
Collection: collection,
380+
Metadata: md,
381+
Type: peer.StateKV_PUT_STATE_METADATA,
382+
}
383+
384+
return nil
385+
}
386+
343387
payloadBytes := marshalOrPanic(&peer.PutStateMetadata{Collection: collection, Key: key, Metadata: md})
344388

345389
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PUT_STATE_METADATA, Payload: payloadBytes, Txid: txID, ChannelId: channelID}
@@ -365,6 +409,16 @@ func (h *Handler) handlePutStateMetadataEntry(collection string, key string, met
365409

366410
// handleDelState communicates with the peer to delete a key from the state in the ledger.
367411
func (h *Handler) handleDelState(collection string, key string, channelID string, txid string) error {
412+
if h.usePutStateBatch {
413+
st := h.stateDataByID(channelID, txid)
414+
st[prefixStateData+collection+key] = &peer.StateKV{
415+
Key: key,
416+
Collection: collection,
417+
Type: peer.StateKV_DEL_STATE,
418+
}
419+
return nil
420+
}
421+
368422
payloadBytes := marshalOrPanic(&peer.DelState{Collection: collection, Key: key})
369423
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_DEL_STATE, Payload: payloadBytes, Txid: txid, ChannelId: channelID}
370424
// Execute the request and get response
@@ -388,6 +442,16 @@ func (h *Handler) handleDelState(collection string, key string, channelID string
388442

389443
// handlePurgeState communicates with the peer to purge a state from private data
390444
func (h *Handler) handlePurgeState(collection string, key string, channelID string, txid string) error {
445+
if h.usePutStateBatch {
446+
st := h.stateDataByID(channelID, txid)
447+
st[prefixStateData+collection+key] = &peer.StateKV{
448+
Key: key,
449+
Collection: collection,
450+
Type: peer.StateKV_PURGE_PRIVATE_DATA,
451+
}
452+
return nil
453+
}
454+
391455
payloadBytes := marshalOrPanic(&peer.DelState{Collection: collection, Key: key})
392456
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PURGE_PRIVATE_DATA, Payload: payloadBytes, Txid: txid, ChannelId: channelID}
393457
// Execute the request and get response
@@ -409,6 +473,69 @@ func (h *Handler) handlePurgeState(collection string, key string, channelID stri
409473
return fmt.Errorf("[%s] incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, peer.ChaincodeMessage_RESPONSE, peer.ChaincodeMessage_ERROR)
410474
}
411475

476+
// handlePutStateBatch communicates with the peer to put state as batch all changes information into the ledger.
477+
func (h *Handler) handlePutStateBatch(batch *peer.PutStateBatch, channelID string, txid string) error {
478+
// Construct payload for PUT_STATE_BATCH
479+
payloadBytes := marshalOrPanic(batch)
480+
481+
msg := &peer.ChaincodeMessage{Type: peer.ChaincodeMessage_PUT_STATE_BATCH, Payload: payloadBytes, Txid: txid, ChannelId: channelID}
482+
483+
// Execute the request and get response
484+
responseMsg, err := h.callPeerWithChaincodeMsg(msg, channelID, txid)
485+
if err != nil {
486+
return fmt.Errorf("[%s] error sending %s: %s", msg.Txid, peer.ChaincodeMessage_PUT_STATE_BATCH, err)
487+
}
488+
489+
if responseMsg.Type == peer.ChaincodeMessage_RESPONSE {
490+
// Success response
491+
return nil
492+
}
493+
494+
if responseMsg.Type == peer.ChaincodeMessage_ERROR {
495+
// Error response
496+
return fmt.Errorf("%s", responseMsg.Payload[:])
497+
}
498+
499+
// Incorrect chaincode message received
500+
return fmt.Errorf("[%s] incorrect chaincode message %s received. Expecting %s or %s", shorttxid(responseMsg.Txid), responseMsg.Type, peer.ChaincodeMessage_RESPONSE, peer.ChaincodeMessage_ERROR)
501+
}
502+
503+
func (h *Handler) sendBatch(channelID string, txid string) error {
504+
if !h.usePutStateBatch {
505+
return nil
506+
}
507+
508+
st := h.stateDataByID(channelID, txid)
509+
txCtxID := transactionContextID(channelID, txid)
510+
511+
defer func() {
512+
h.stateDataMutex.Lock()
513+
delete(h.stateData, txCtxID)
514+
h.stateDataMutex.Unlock()
515+
}()
516+
517+
batch := &peer.PutStateBatch{}
518+
for _, kv := range st {
519+
batch.Kvs = append(batch.Kvs, kv)
520+
if len(batch.Kvs) >= int(h.maxSizePutStateBatch) {
521+
err := h.handlePutStateBatch(batch, channelID, txid)
522+
if err != nil {
523+
return fmt.Errorf("failed send state batch: %s", err)
524+
}
525+
batch.Kvs = batch.Kvs[:0]
526+
}
527+
}
528+
529+
if len(batch.Kvs) != 0 {
530+
err := h.handlePutStateBatch(batch, channelID, txid)
531+
if err != nil {
532+
return fmt.Errorf("failed send state batch: %s", err)
533+
}
534+
}
535+
536+
return nil
537+
}
538+
412539
func (h *Handler) handleGetStateByRange(collection, startKey, endKey string, metadata []byte,
413540
channelID string, txid string) (*peer.QueryResponse, error) {
414541
// Send GET_STATE_BY_RANGE message to peer chaincode support
@@ -655,6 +782,23 @@ func (h *Handler) handleEstablished(msg *peer.ChaincodeMessage) error {
655782
}
656783

657784
h.state = ready
785+
if len(msg.Payload) == 0 {
786+
return nil
787+
}
788+
789+
ccAdditionalParams := &peer.ChaincodeAdditionalParams{}
790+
err := proto.Unmarshal(msg.Payload, ccAdditionalParams)
791+
if err != nil {
792+
return nil
793+
}
794+
795+
h.usePutStateBatch = ccAdditionalParams.UsePutStateBatch
796+
h.maxSizePutStateBatch = ccAdditionalParams.MaxSizePutStateBatch
797+
798+
if h.usePutStateBatch && h.maxSizePutStateBatch < defaultMaxSizePutStateBatch {
799+
h.maxSizePutStateBatch = defaultMaxSizePutStateBatch
800+
}
801+
658802
return nil
659803
}
660804

@@ -697,6 +841,28 @@ func (h *Handler) handleMessage(msg *peer.ChaincodeMessage, errc chan error) err
697841
return nil
698842
}
699843

844+
func (h *Handler) stateDataByID(channelID string, txID string) map[string]*peer.StateKV {
845+
txCtxID := transactionContextID(channelID, txID)
846+
847+
h.stateDataMutex.RLock()
848+
st, ok := h.stateData[txCtxID]
849+
h.stateDataMutex.RUnlock()
850+
if ok {
851+
return st
852+
}
853+
854+
h.stateDataMutex.Lock()
855+
defer h.stateDataMutex.Unlock()
856+
st, ok = h.stateData[txCtxID]
857+
if ok {
858+
return st
859+
}
860+
861+
st = make(map[string]*peer.StateKV)
862+
h.stateData[txCtxID] = st
863+
return st
864+
}
865+
700866
// marshalOrPanic attempts to marshal the provided protobbuf message but will panic
701867
// when marshaling fails instead of returning an error.
702868
func marshalOrPanic(msg proto.Message) []byte {

shim/handler_test.go

+14-13
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func TestNewHandler_CreatedState(t *testing.T) {
4949
cc: cc,
5050
responseChannels: map[string]chan *peer.ChaincodeMessage{},
5151
state: created,
52+
stateData: map[string]map[string]*peer.StateKV{},
5253
}
5354

5455
handler := newChaincodeHandler(chatStream, cc)
@@ -125,7 +126,7 @@ func TestHandlerState(t *testing.T) {
125126
}
126127
err := handler.handleMessage(test.msg, nil)
127128
if test.expectedErr != "" {
128-
assert.Contains(t, err.Error(), test.expectedErr)
129+
assert.ErrorContains(t, err, test.expectedErr)
129130
} else {
130131
assert.NoError(t, err)
131132
}
@@ -217,7 +218,7 @@ func TestHandleMessage(t *testing.T) {
217218

218219
err := handler.handleMessage(test.msg, nil)
219220
if test.expectedErr != "" {
220-
assert.Contains(t, err.Error(), test.expectedErr)
221+
assert.ErrorContains(t, err, test.expectedErr)
221222
} else {
222223
if err != nil {
223224
t.Fatalf("Unexpected error for '%s': %s", test.name, err)
@@ -264,36 +265,36 @@ func TestHandlePeerCalls(t *testing.T) {
264265
// force error by removing responseChannels
265266
h.responseChannels = nil
266267
_, err = h.handleGetState("col", "key", "channel", "txid")
267-
assert.Contains(t, err.Error(), "[txid] error sending GET_STATE")
268+
assert.ErrorContains(t, err, "[txid] error sending GET_STATE")
268269

269270
_, err = h.handleGetPrivateDataHash("col", "key", "channel", "txid")
270-
assert.Contains(t, err.Error(), "[txid] error sending GET_PRIVATE_DATA_HASH")
271+
assert.ErrorContains(t, err, "[txid] error sending GET_PRIVATE_DATA_HASH")
271272

272273
_, err = h.handleGetStateMetadata("col", "key", "channel", "txid")
273-
assert.Contains(t, err.Error(), "[txid] error sending GET_STATE_METADATA")
274+
assert.ErrorContains(t, err, "[txid] error sending GET_STATE_METADATA")
274275

275276
err = h.handlePutState("col", "key", []byte{}, "channel", "txid")
276-
assert.Contains(t, err.Error(), "[txid] error sending PUT_STATE")
277+
assert.ErrorContains(t, err, "[txid] error sending PUT_STATE")
277278

278279
err = h.handlePutStateMetadataEntry("col", "key", "mkey", []byte{}, "channel", "txid")
279-
assert.Contains(t, err.Error(), "[txid] error sending PUT_STATE_METADATA")
280+
assert.ErrorContains(t, err, "[txid] error sending PUT_STATE_METADATA")
280281

281282
err = h.handleDelState("col", "key", "channel", "txid")
282-
assert.Contains(t, err.Error(), "[txid] error sending DEL_STATE")
283+
assert.ErrorContains(t, err, "[txid] error sending DEL_STATE")
283284

284285
_, err = h.handleGetStateByRange("col", "start", "end", []byte{}, "channel", "txid")
285-
assert.Contains(t, err.Error(), "[txid] error sending GET_STATE_BY_RANGE")
286+
assert.ErrorContains(t, err, "[txid] error sending GET_STATE_BY_RANGE")
286287

287288
_, err = h.handleQueryStateNext("id", "channel", "txid")
288-
assert.Contains(t, err.Error(), "cannot create response channel")
289+
assert.ErrorContains(t, err, "cannot create response channel")
289290

290291
_, err = h.handleQueryStateClose("id", "channel", "txid")
291-
assert.Contains(t, err.Error(), "cannot create response channel")
292+
assert.ErrorContains(t, err, "cannot create response channel")
292293

293294
_, err = h.handleGetQueryResult("col", "query", []byte{}, "channel", "txid")
294-
assert.Contains(t, err.Error(), "[txid] error sending GET_QUERY_RESULT")
295+
assert.ErrorContains(t, err, "[txid] error sending GET_QUERY_RESULT")
295296

296297
_, err = h.handleGetHistoryForKey("key", "channel", "txid")
297-
assert.Contains(t, err.Error(), "cannot create response channel")
298+
assert.ErrorContains(t, err, "cannot create response channel")
298299

299300
}

0 commit comments

Comments
 (0)