diff --git a/node/node.go b/node/node.go index 96fa8b967e..a1f37f7a1b 100644 --- a/node/node.go +++ b/node/node.go @@ -52,7 +52,8 @@ type Node struct { DataExchange exchange.Interface DAG format.DAGService // p2p protocols - PubSub *pubsub.PubSub + PubSub *pubsub.PubSub + // services ShareServ share.Service // not optional HeaderServ *header.Service // not optional StateServ *state.Service // not optional diff --git a/service/state/core_access.go b/service/state/core_access.go index 0796af7999..5320d17794 100644 --- a/service/state/core_access.go +++ b/service/state/core_access.go @@ -103,3 +103,14 @@ func (ca *CoreAccessor) SubmitTxWithBroadcastMode( } return txResp.TxResponse, nil } + +func (ca *CoreAccessor) KeyringSigner() *apptypes.KeyringSigner { + return ca.signer +} + +func (ca *CoreAccessor) Conn() (*grpc.ClientConn, error) { + if ca.coreConn == nil { + return nil, fmt.Errorf("no running gRPC connection") + } + return ca.coreConn, nil +} diff --git a/service/state/interface.go b/service/state/interface.go index 05de4b8b3b..4e03fc2d38 100644 --- a/service/state/interface.go +++ b/service/state/interface.go @@ -2,6 +2,10 @@ package state import ( "context" + + "google.golang.org/grpc" + + apptypes "github.com/celestiaorg/celestia-app/x/payment/types" ) // Accessor represents the behaviors necessary for a user to @@ -12,6 +16,13 @@ type Accessor interface { Start(context.Context) error // Stop stops the state Accessor. Stop(context.Context) error + + // KeyringSigner returns the KeyringSigner used by the Accessor. + KeyringSigner() *apptypes.KeyringSigner + // Conn returns a gRPC connection to node that serves state-related + // requests. + Conn() (*grpc.ClientConn, error) + // Balance retrieves the Celestia coin balance // for the node's account/signer. Balance(ctx context.Context) (*Balance, error) diff --git a/service/state/pfd.go b/service/state/pfd.go new file mode 100644 index 0000000000..ed736ec502 --- /dev/null +++ b/service/state/pfd.go @@ -0,0 +1,101 @@ +package state + +import ( + "context" + "fmt" + + "github.com/cosmos/cosmos-sdk/x/auth/signing" + + apptypes "github.com/celestiaorg/celestia-app/x/payment/types" + "github.com/celestiaorg/nmt/namespace" +) + +var ( + // shareSizes includes all the possible share sizes of the given data + // that the signer must sign over. + shareSizes = []uint64{16, 32, 64, 128} +) + +// PayForData is an alias to the PayForMessage packet. +type PayForData = apptypes.MsgWirePayForMessage + +// SubmitPayForData builds, signs and submits a PayForData message. +func (s *Service) SubmitPayForData( + ctx context.Context, + nID namespace.ID, + data []byte, + gasLim uint64, + maxSize uint64, +) (*TxResponse, error) { + pfd, err := s.BuildPayForData(ctx, nID, data, gasLim, maxSize) + if err != nil { + return nil, err + } + + signed, err := s.SignPayForData(pfd, apptypes.SetGasLimit(gasLim)) + if err != nil { + return nil, err + } + + signer := s.accessor.KeyringSigner() + rawTx, err := signer.EncodeTx(signed) + if err != nil { + return nil, err + } + + return s.accessor.SubmitTx(ctx, rawTx) +} + +// BuildPayForData builds a PayForData message. +func (s *Service) BuildPayForData( + ctx context.Context, + nID namespace.ID, + message []byte, + gasLim uint64, + maxSize uint64, +) (*PayForData, error) { + // create the raw WirePayForMessage transaction + wpfmMsg, err := apptypes.NewWirePayForMessage(nID, message, shareSizes...) + if err != nil { + return nil, err + } + // TODO @renaynay: as a result of `padMessage`, the following is observed: + // {wpfmMsg.MessageSize: 256, len(message): 5} + if wpfmMsg.GetMessageSize() > maxSize { + return nil, fmt.Errorf("message size %d cannot exceed specified max size %d", wpfmMsg.GetMessageSize(), maxSize) + } + + // get signer and conn info + signer := s.accessor.KeyringSigner() + conn, err := s.accessor.Conn() + if err != nil { + return nil, err + } + + // query for account information necessary to sign a valid tx + err = signer.QueryAccountNumber(ctx, conn) + if err != nil { + return nil, err + } + + // generate the signatures for each `MsgPayForMessage` using the `KeyringSigner`, + // then set the gas limit for the tx + gasLimOption := apptypes.SetGasLimit(gasLim) + err = wpfmMsg.SignShareCommitments(signer, gasLimOption) + if err != nil { + return nil, err + } + + return wpfmMsg, nil +} + +// SignPayForData signs the given PayForData message. +func (s *Service) SignPayForData(pfd *PayForData, gasLimOption apptypes.TxBuilderOption) (signing.Tx, error) { + signer := s.accessor.KeyringSigner() + // Build and sign the final `WirePayForMessage` tx that now contains the signatures + // for potential `MsgPayForMessage`s + return signer.BuildSignedTx( + gasLimOption(signer.NewTxBuilder()), + pfd, + ) +} diff --git a/service/state/rpc.go b/service/state/rpc.go index 57eb69353c..70bb34c847 100644 --- a/service/state/rpc.go +++ b/service/state/rpc.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "io/ioutil" "net/http" "github.com/cosmos/cosmos-sdk/types" @@ -13,17 +14,25 @@ import ( "github.com/celestiaorg/celestia-node/node/rpc" ) -var log = logging.Logger("state/rpc") - var ( addrKey = "address" txKey = "tx" + + log = logging.Logger("state/rpc") ) +type submitPFDRequest struct { + NamespaceID string `json:"namespace_id"` + Data string `json:"data"` + GasLimit uint64 `json:"gas_limit"` + MaxDataSize uint64 `json:"max_data_size"` +} + func (s *Service) RegisterEndpoints(rpc *rpc.Server) { rpc.RegisterHandlerFunc("/balance", s.handleBalanceRequest, http.MethodGet) rpc.RegisterHandlerFunc(fmt.Sprintf("/balance/{%s}", addrKey), s.handleBalanceForAddrRequest, http.MethodGet) rpc.RegisterHandlerFunc(fmt.Sprintf("/submit_tx/{%s}", txKey), s.handleSubmitTx, http.MethodPost) + rpc.RegisterHandlerFunc("/submit_pfd", s.handleSubmitPFD, http.MethodPost) } func (s *Service) handleBalanceRequest(w http.ResponseWriter, r *http.Request) { @@ -101,3 +110,51 @@ func (s *Service) handleSubmitTx(w http.ResponseWriter, r *http.Request) { log.Errorw("writing /balance response", "err", err) } } + +func (s *Service) handleSubmitPFD(w http.ResponseWriter, r *http.Request) { + // parse body from request + raw, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + log.Errorw("serving /submit_pfd request", "err", err) + return + } + // decode request + req := new(submitPFDRequest) + err = json.Unmarshal(raw, req) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + log.Errorw("serving /submit_pfd request", "err", err) + return + } + + nID, err := hex.DecodeString(req.NamespaceID) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + log.Errorw("serving /submit_pfd request", "err", err) + return + } + data := []byte(req.Data) + + // perform request + txResp, err := s.SubmitPayForData(r.Context(), nID, data, req.GasLimit, req.MaxDataSize) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, werr := w.Write([]byte(err.Error())) + if werr != nil { + log.Errorw("writing /submit_pfd response", "err", werr) + } + log.Errorw("serving /submit_pfd request", "err", err) + return + } + resp, err := json.Marshal(txResp) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Errorw("serving /submit_pfd request", "err", err) + return + } + _, err = w.Write(resp) + if err != nil { + log.Errorw("writing /submit_pfd response", "err", err) + } +}