diff --git a/modules/host.go b/modules/host.go index 666242db1a..bea896b0d8 100644 --- a/modules/host.go +++ b/modules/host.go @@ -105,6 +105,7 @@ type ( RenewCalls uint64 `json:"renewcalls"` ReviseCalls uint64 `json:"revisecalls"` SettingsCalls uint64 `json:"settingscalls"` + MetadataCalls uint64 `json:"metadatacalls"` UnrecognizedCalls uint64 `json:"unrecognizedcalls"` } diff --git a/modules/host/host.go b/modules/host/host.go index acd20d231f..9c9425957d 100644 --- a/modules/host/host.go +++ b/modules/host/host.go @@ -122,6 +122,7 @@ type Host struct { atomicRenewCalls uint64 atomicReviseCalls uint64 atomicSettingsCalls uint64 + atomicMetadataCalls uint64 atomicUnrecognizedCalls uint64 // Error management. There are a few different types of errors returned by diff --git a/modules/host/negotiatemetadata.go b/modules/host/negotiatemetadata.go new file mode 100644 index 0000000000..c0e78ef5e2 --- /dev/null +++ b/modules/host/negotiatemetadata.go @@ -0,0 +1,59 @@ +package host + +import ( + "errors" + "net" + + "github.com/NebulousLabs/Sia/encoding" + "github.com/NebulousLabs/Sia/modules" +) + +// managedRPCMetadata accepts a request to get list of sector ids. +func (h *Host) managedRPCMetadata(conn net.Conn) error { + // Perform the file contract revision exchange, giving the renter the most + // recent file contract revision and getting the storage obligation that + // will be used to get sector ids. + _, so, err := h.managedRPCRecentRevision(conn) + if err != nil { + return extendErr("RPCRecentRevision failed: ", err) + } + // The storage obligation is received with a lock on it. Defer a call to + // unlock the storage obligation. + defer func() { + h.managedUnlockStorageObligation(so.id()) + }() + // Receive boundaries of so.SectorRoots to return. + var begin, end uint64 + err = encoding.ReadObject(conn, &begin, 8) + if err != nil { + return extendErr("unable to read 'begin': ", ErrorConnection(err.Error())) + } + err = encoding.ReadObject(conn, &end, 8) + if err != nil { + return extendErr("unable to read 'end': ", ErrorConnection(err.Error())) + } + if end < begin { + err = errors.New("Range error") + modules.WriteNegotiationRejection(conn, err) + return err + } + if end > uint64(len(so.SectorRoots)) { + err = errors.New("Range out of bounds error") + modules.WriteNegotiationRejection(conn, err) + return err + } + if end-begin > modules.NegotiateMetadataMaxSliceSize { + err = errors.New("The range is too long") + modules.WriteNegotiationRejection(conn, err) + return err + } + if err = modules.WriteNegotiationAcceptance(conn); err != nil { + return extendErr("failed to write [begin,end) acceptance: ", ErrorConnection(err.Error())) + } + // Write roots of all sectors. + err = encoding.WriteObject(conn, so.SectorRoots[begin:end]) + if err != nil { + return extendErr("cound not write sectors: ", ErrorConnection(err.Error())) + } + return nil +} diff --git a/modules/host/network.go b/modules/host/network.go index 5008942b39..98ca793bbf 100644 --- a/modules/host/network.go +++ b/modules/host/network.go @@ -281,6 +281,9 @@ func (h *Host) threadedHandleConn(conn net.Conn) { case modules.RPCDownload: atomic.AddUint64(&h.atomicDownloadCalls, 1) err = extendErr("incoming RPCDownload failed: ", h.managedRPCDownload(conn)) + case modules.RPCMetadata: + atomic.AddUint64(&h.atomicMetadataCalls, 1) + err = extendErr("incoming RPCMetadata failed: ", h.managedRPCMetadata(conn)) case modules.RPCRenewContract: atomic.AddUint64(&h.atomicRenewCalls, 1) err = extendErr("incoming RPCRenewContract failed: ", h.managedRPCRenewContract(conn)) diff --git a/modules/negotiate.go b/modules/negotiate.go index 0b197d68fa..df3983bee3 100644 --- a/modules/negotiate.go +++ b/modules/negotiate.go @@ -30,6 +30,13 @@ const ( // the negotiation. NegotiateDownloadTime = 600 * time.Second + // NegotiateMetadataTime establishes the minimum amount of time that + // the connection deadline is expected to be set to when a metadata + // is being requested from the host. The deadline is long + // enough that the connection should be successful even if both parties are + // running Tor. + NegotiateMetadataTime = 120 * time.Second + // NegotiateFileContractRevisionTime defines the minimum amount of time // that the renter and host have to negotiate a file contract revision. The // time is set high enough that a full 4MB can be piped through a @@ -154,6 +161,9 @@ var ( // RPCDownload is the specifier for downloading a file from a host. RPCDownload = types.Specifier{'D', 'o', 'w', 'n', 'l', 'o', 'a', 'd', 2} + // RPCMetadata is the specifier for getting the list of sector roots. + RPCMetadata = types.Specifier{'M', 'e', 't', 'a', 'd', 'a', 't', 'a'} + // RPCFormContract is the specifier for forming a contract with a host. RPCFormContract = types.Specifier{'F', 'o', 'r', 'm', 'C', 'o', 'n', 't', 'r', 'a', 'c', 't', 2} @@ -177,6 +187,14 @@ var ( Standard: uint64(1 << 22), // 4 MiB Testing: uint64(1 << 12), // 4 KiB }).(uint64) + + // NegotiateMetadataMaxSliceSize establishes the maximum allowed length + // of the list of sectors returned by the Metadata RPC. + NegotiateMetadataMaxSliceSize = build.Select(build.Var{ + Dev: uint64(1 << 17), + Standard: uint64(1 << 17), + Testing: uint64(1 << 4), + }).(uint64) ) type ( diff --git a/modules/renter/contractor/host_integration_test.go b/modules/renter/contractor/host_integration_test.go index 070545750f..5263b651bd 100644 --- a/modules/renter/contractor/host_integration_test.go +++ b/modules/renter/contractor/host_integration_test.go @@ -18,6 +18,7 @@ import ( "github.com/NebulousLabs/Sia/modules/host" "github.com/NebulousLabs/Sia/modules/miner" "github.com/NebulousLabs/Sia/modules/renter/hostdb" + "github.com/NebulousLabs/Sia/modules/renter/proto" "github.com/NebulousLabs/Sia/modules/transactionpool" modWallet "github.com/NebulousLabs/Sia/modules/wallet" "github.com/NebulousLabs/Sia/types" @@ -670,3 +671,114 @@ func TestContractPresenceLeak(t *testing.T) { t.Fatalf("Expected to get equal errors, got %q and %q.", errors[0], errors[1]) } } + +// TestIntegrationMetadata tests the Metadata RPC. +func TestIntegrationMetadata(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + t.Parallel() + // create testing trio + h, c, _, err := newTestingTrio(t.Name()) + if err != nil { + t.Fatal(err) + } + defer h.Close() + defer c.Close() + + // get the host's entry from the db + hostEntry, ok := c.hdb.Host(h.PublicKey()) + if !ok { + t.Fatal("no entry for host in db") + } + + // form a contract with the host + contract, err := c.managedNewContract(hostEntry, types.SiacoinPrecision.Mul64(10), c.blockHeight+100) + if err != nil { + t.Fatal(err) + } + sc, has := c.staticContracts.Acquire(contract.ID) + if !has { + t.Fatal("c.staticContracts.Acquire returned false") + } + secketKey := sc.Metadata().SecretKey + windowStart := sc.Metadata().EndHeight + c.staticContracts.Return(sc) + + // get revision and no sector ids from the host + lastRevision, _, err := proto.GetMetadata(hostEntry, contract.ID, secketKey, windowStart, 0, 0, nil) + if err != nil { + t.Fatalf("RPCMetadata returned error: %v", err) + } + wantSize := uint64(0) + if lastRevision.NewFileSize != wantSize { + t.Errorf("lastRevision.NewFileSize = %d, want %d", lastRevision.NewFileSize, wantSize) + } + + n := modules.NegotiateMetadataMaxSliceSize * 2 + + // revise the contract + editor, err := c.Editor(contract.HostPublicKey, nil) + if err != nil { + t.Fatal(err) + } + var want []crypto.Hash + for i := 0; i < int(n); i++ { + data := fastrand.Bytes(int(modules.SectorSize)) + root, err := editor.Upload(data) + if err != nil { + t.Fatal(err) + } + want = append(want, root) + } + err = editor.Close() + if err != nil { + t.Fatal(err) + } + + // check correct ranges + correctRanges := []struct{ begin, end uint64 }{ + {0, 0}, + {0, 1}, + {0, 2}, + {0, modules.NegotiateMetadataMaxSliceSize}, + {1, modules.NegotiateMetadataMaxSliceSize + 1}, + {modules.NegotiateMetadataMaxSliceSize, 2 * modules.NegotiateMetadataMaxSliceSize}, + {uint64(n - 1), uint64(n)}, + {uint64(n - 1), uint64(n - 1)}, + {uint64(n), uint64(n)}, + } + for _, r := range correctRanges { + // get revision and sector IDs from the host + lastRevision, got, err := proto.GetMetadata(hostEntry, contract.ID, secketKey, windowStart, r.begin, r.end, nil) + size := r.end - r.begin + if err != nil { + t.Fatalf("RPCMetadata returned error for case %v: %v", r, err) + } + wantSize = n * modules.SectorSize + if lastRevision.NewFileSize != wantSize { + t.Errorf("case %v, lastRevision.NewFileSize = %d, want %d", r, lastRevision.NewFileSize, wantSize) + } + if uint64(len(got)) != size { + t.Fatalf("case %v, list length: want %d, got %d", r, size, len(got)) + } + for i := r.begin; i < r.end; i++ { + if got[i-r.begin] != want[i] { + t.Errorf("RPCMetadata returned wrong sector id for case %v for sector %d", r, i) + } + } + } + + // check incorrect ranges + incorrectRanges := []struct{ begin, end uint64 }{ + {5, 4}, + {0, modules.NegotiateMetadataMaxSliceSize + 1}, + {uint64(n - 1), uint64(n + 1)}, + } + for _, r := range incorrectRanges { + _, _, err := proto.GetMetadata(hostEntry, contract.ID, secketKey, windowStart, r.begin, r.end, nil) + if err == nil { + t.Fatalf("RPCMetadata succeeded for case %v, want error", r) + } + } +} diff --git a/modules/renter/proto/metadata.go b/modules/renter/proto/metadata.go new file mode 100644 index 0000000000..7ea3d725ee --- /dev/null +++ b/modules/renter/proto/metadata.go @@ -0,0 +1,57 @@ +package proto + +import ( + "errors" + "net" + "time" + + "github.com/NebulousLabs/Sia/crypto" + "github.com/NebulousLabs/Sia/encoding" + "github.com/NebulousLabs/Sia/modules" + "github.com/NebulousLabs/Sia/types" +) + +// GetMetadata downloads sector IDs from the host. +func GetMetadata(host modules.HostDBEntry, fcid types.FileContractID, sk crypto.SecretKey, windowStart types.BlockHeight, begin, end uint64, cancel <-chan struct{}) (lastRevision types.FileContractRevision, ids []crypto.Hash, err error) { + conn, err := (&net.Dialer{ + Cancel: cancel, + Timeout: 15 * time.Second, + }).Dial("tcp", string(host.NetAddress)) + if err != nil { + return + } + defer conn.Close() + // allot 2 minutes for RPC request + revision exchange + extendDeadline(conn, modules.NegotiateMetadataTime) + if err = encoding.WriteObject(conn, modules.RPCMetadata); err != nil { + err = errors.New("couldn't initiate RPC: " + err.Error()) + return + } + lastRevision, err = getRecentRevision(conn, fcid, sk, windowStart, host.Version) + if err != nil { + return + } + if err = encoding.WriteObject(conn, begin); err != nil { + err = errors.New("unable to write 'begin': " + err.Error()) + return + } + if err = encoding.WriteObject(conn, end); err != nil { + err = errors.New("unable to write 'end': " + err.Error()) + return + } + // read acceptance + if err = modules.ReadNegotiationAcceptance(conn); err != nil { + err = errors.New("host did not accept [begin,end): " + err.Error()) + return + } + numSectors := end - begin + if err = encoding.ReadObject(conn, &ids, numSectors*crypto.HashSize+8); err != nil { + err = errors.New("unable to read 'ids': " + err.Error()) + return + } + if uint64(len(ids)) != end-begin { + err = errors.New("the host returned too short list of sector IDs") + return + } + return +}