From 8572865ed7d96d13dfa85f420d1913cb2379d30b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 11 Aug 2020 16:51:09 +0100 Subject: [PATCH] in-memory journal to aid sync mining. --- lotus-soup/go.mod | 15 +- lotus-soup/go.sum | 36 +++ lotus-soup/testkit/memjournal/memory.go | 249 +++++++++++++++++++ lotus-soup/testkit/memjournal/memory_test.go | 185 ++++++++++++++ 4 files changed, 479 insertions(+), 6 deletions(-) create mode 100644 lotus-soup/testkit/memjournal/memory.go create mode 100644 lotus-soup/testkit/memjournal/memory_test.go diff --git a/lotus-soup/go.mod b/lotus-soup/go.mod index 6185fe8b..84907aff 100644 --- a/lotus-soup/go.mod +++ b/lotus-soup/go.mod @@ -8,13 +8,13 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/drand/drand v1.0.3-0.20200714175734-29705eaf09d4 github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef - github.com/filecoin-project/go-fil-markets v0.5.3 + github.com/filecoin-project/go-fil-markets v0.5.4 github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b - github.com/filecoin-project/lotus v0.4.3-0.20200805153606-8458f04a6b73 - github.com/filecoin-project/sector-storage v0.0.0-20200803184904-3cab915fd225 - github.com/filecoin-project/specs-actors v0.8.6 - github.com/filecoin-project/storage-fsm v0.0.0-20200730122205-d423ae90d8d4 + github.com/filecoin-project/lotus v0.4.3-0.20200811132800-b534ab9d3c8a + github.com/filecoin-project/sector-storage v0.0.0-20200805173933-deec7a2658d4 + github.com/filecoin-project/specs-actors v0.8.7-0.20200805174427-9d42fb163883 + github.com/filecoin-project/storage-fsm v0.0.0-20200805013058-9d9ea4e6331f github.com/google/uuid v1.1.1 github.com/gorilla/mux v1.7.4 github.com/hashicorp/go-multierror v1.1.0 @@ -28,13 +28,16 @@ require ( github.com/ipfs/go-unixfs v0.2.4 github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae github.com/kpacha/opencensus-influxdb v0.0.0-20181102202715-663e2683a27c - github.com/libp2p/go-libp2p v0.10.2 + github.com/libp2p/go-libp2p v0.10.3 github.com/libp2p/go-libp2p-core v0.6.1 github.com/libp2p/go-libp2p-pubsub-tracer v0.0.0-20200626141350-e730b32bf1e6 github.com/multiformats/go-multiaddr v0.2.2 github.com/multiformats/go-multiaddr-net v0.1.5 + github.com/raulk/clock v1.1.0 + github.com/stretchr/testify v1.6.1 github.com/testground/sdk-go v0.2.3-0.20200706132230-6a65ddac2d8c go.opencensus.io v0.22.4 + go.uber.org/fx v1.9.0 golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 ) diff --git a/lotus-soup/go.sum b/lotus-soup/go.sum index c091cb8f..7f003e68 100644 --- a/lotus-soup/go.sum +++ b/lotus-soup/go.sum @@ -230,8 +230,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.8.0 h1:5bzFgL+oy7JITMTxUPJ00n7VxmYd/PdMp5mHFX40/RY= github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGjnw8= github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E= +github.com/filecoin-project/chain-validation v0.0.6-0.20200720093255-843129967fdf/go.mod h1:9xZvimiD8wsZbTNTUoACMPzXj4/fpIxeZBV2YjQcLhI= github.com/filecoin-project/chain-validation v0.0.6-0.20200731192946-f90377ebe789 h1:/q6q6xNQWymV7qVXF2k5poF0QaWdJAqU00LacgANCuM= github.com/filecoin-project/chain-validation v0.0.6-0.20200731192946-f90377ebe789/go.mod h1:JICNIbIEZ+qNJ/PQlHxjei6SaeBbW+yV2r4BcShsXfI= +github.com/filecoin-project/chain-validation v0.0.6-0.20200807023228-a084d8d9919e/go.mod h1:jtOqSVob11urlSIM8Gw3XIrqmiqb+fBr7ZBIbOhzGPI= github.com/filecoin-project/go-address v0.0.0-20200107215422-da8eea2842b5/go.mod h1:SAOwJoakQ8EPjwNIsiakIQKsoKdkcbx8U3IapgCg9R0= github.com/filecoin-project/go-address v0.0.2-0.20200218010043-eb9bb40ed5be/go.mod h1:SAOwJoakQ8EPjwNIsiakIQKsoKdkcbx8U3IapgCg9R0= github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef h1:Wi5E+P1QfHP8IF27eUiTx5vYfqQZwfPxzq3oFEq8w8U= @@ -245,6 +247,7 @@ github.com/filecoin-project/go-amt-ipld/v2 v2.1.1-0.20200731171407-e559a0579161 github.com/filecoin-project/go-amt-ipld/v2 v2.1.1-0.20200731171407-e559a0579161/go.mod h1:vgmwKBkx+ca5OIeEvstiQgzAZnb7R6QaqE1oEDSqa6g= github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060/go.mod h1:iodsLxOFZnqKtjj2zkgqzoGNrv6vUqj69AT/J8DKXEw= github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= +github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-bitfield v0.0.3/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1 h1:xuHlrdznafh7ul5t4xEncnA4qgpQvJZEw+mr98eqHXw= github.com/filecoin-project/go-bitfield v0.0.4-0.20200703174658-f4a5758051a1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= @@ -254,13 +257,17 @@ github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:a github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= +github.com/filecoin-project/go-data-transfer v0.4.1-0.20200715144713-b3311844e1a5/go.mod h1:duGDSKvsOxiKl6Dueh8DNA6ZbiM30PWUWlSKjo9ac+o= github.com/filecoin-project/go-data-transfer v0.5.3 h1:pErOk+xeX0eiZ9UJJMJKGBI+WgdN/4/AMXQDlFDfFNg= github.com/filecoin-project/go-data-transfer v0.5.3/go.mod h1:30ROzlBS8tbTkszmW9a6/N4oD5bIh6QRBCXC6lORuI8= github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5/go.mod h1:JbkIgFF/Z9BDlvrJO1FuKkaWsH673/UdFaiVS6uIHlA= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= +github.com/filecoin-project/go-fil-markets v0.4.1-0.20200715201050-c141144ea312/go.mod h1:MvrpKOiETu39e9H167gdQzdzLNcvHsUp48UkXqPSdtU= github.com/filecoin-project/go-fil-markets v0.5.3 h1:BjEfUIe/ov95jt7K9m9F/ZEJsvdlJAstSPfNyBXcPd0= github.com/filecoin-project/go-fil-markets v0.5.3/go.mod h1:RNaiPhWF8xPrb9oUWJK7FGfD1jkdsk4XjCwczKpwnX0= +github.com/filecoin-project/go-fil-markets v0.5.4 h1:FuK7vSpWFN/sD3rMpP33rk8LbCcx/GUdqn4hUI5M+Ys= +github.com/filecoin-project/go-fil-markets v0.5.4/go.mod h1:RNaiPhWF8xPrb9oUWJK7FGfD1jkdsk4XjCwczKpwnX0= github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms= github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM= github.com/filecoin-project/go-multistore v0.0.3 h1:vaRBY4YiA2UZFPK57RNuewypB8u0DzzQwqsL0XarpnI= @@ -282,22 +289,38 @@ github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8= github.com/filecoin-project/lotus v0.4.3-0.20200805153606-8458f04a6b73 h1:NW3r/lczsOJBlayD3h28spT8slpuPGb3on6bvIBfeNo= github.com/filecoin-project/lotus v0.4.3-0.20200805153606-8458f04a6b73/go.mod h1:eEU4A7/Co8KGKqWAmbMB5YPYuzNB19Nq4ZD+kXaBvIM= +github.com/filecoin-project/lotus v0.4.3-0.20200811132800-b534ab9d3c8a h1:+vBVcNHS/qKVXxS8bVvSL+/l08UIpmaUP5uLXbJFt+U= +github.com/filecoin-project/lotus v0.4.3-0.20200811132800-b534ab9d3c8a/go.mod h1:FWEV/ODzU90SyPxpQfmPbiTWhp2OUdIQO8MmLC35j/M= +github.com/filecoin-project/sector-storage v0.0.0-20200615154852-728a47ab99d6/go.mod h1:M59QnAeA/oV+Z8oHFLoNpGMv0LZ8Rll+vHVXX7GirPM= github.com/filecoin-project/sector-storage v0.0.0-20200712023225-1d67dcfa3c15/go.mod h1:salgVdX7qeXFo/xaiEQE29J4pPkjn71T0kt0n+VDBzo= +github.com/filecoin-project/sector-storage v0.0.0-20200717213554-a109ef9cbeab/go.mod h1:7EE+f7jM4kCy2MKHoiiwNDQGJSb+QQzZ+y+/17ugq4w= github.com/filecoin-project/sector-storage v0.0.0-20200730050024-3ee28c3b6d9a/go.mod h1:oOawOl9Yk+qeytLzzIryjI8iRbqo+qzS6EEeElP4PWA= github.com/filecoin-project/sector-storage v0.0.0-20200803184904-3cab915fd225 h1:Or2lM5Cdsq0nDrSWp2YO70tjd8Ohg0jVWT/KGP3BX+I= github.com/filecoin-project/sector-storage v0.0.0-20200803184904-3cab915fd225/go.mod h1:oOawOl9Yk+qeytLzzIryjI8iRbqo+qzS6EEeElP4PWA= +github.com/filecoin-project/sector-storage v0.0.0-20200805173933-deec7a2658d4 h1:vEqr7sPz9sFEe8unfvY5X4DmkQrhmfLvbsXlmVMtmqM= +github.com/filecoin-project/sector-storage v0.0.0-20200805173933-deec7a2658d4/go.mod h1:oOawOl9Yk+qeytLzzIryjI8iRbqo+qzS6EEeElP4PWA= github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA= github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y= +github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY= github.com/filecoin-project/specs-actors v0.6.1/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY= +github.com/filecoin-project/specs-actors v0.7.0/go.mod h1:+z0htZu/wLBDbOLcQTKKUEC2rkUTFzL2KJ/bRAVWkws= github.com/filecoin-project/specs-actors v0.7.3-0.20200716231407-60a2ae96d2e6/go.mod h1:JOMUa7EijvpOO4ofD1yeHNmqohkmmnhTvz/IpB6so4c= +github.com/filecoin-project/specs-actors v0.8.1-0.20200720061236-f4719fdd7d90/go.mod h1:JOMUa7EijvpOO4ofD1yeHNmqohkmmnhTvz/IpB6so4c= +github.com/filecoin-project/specs-actors v0.8.1-0.20200720115956-cd051eabf328/go.mod h1:0+CxQ5Jeii3522irTvhKRDpr4GG1bj5Erq3p/d38DzY= github.com/filecoin-project/specs-actors v0.8.2/go.mod h1:Q3ACV5kBLvqPaYbthc/J1lGMJ5OwogmD9pzdtPRMdCw= github.com/filecoin-project/specs-actors v0.8.5/go.mod h1:Q3ACV5kBLvqPaYbthc/J1lGMJ5OwogmD9pzdtPRMdCw= github.com/filecoin-project/specs-actors v0.8.6 h1:EnmrHqpzURzGeSDbeui/snQTVzl/RXoSEgAf5xMREaI= github.com/filecoin-project/specs-actors v0.8.6/go.mod h1:QRihI/fadrhWzt7HH6mT32upOdDFpSYCFnr3JEI1L50= +github.com/filecoin-project/specs-actors v0.8.7-0.20200805174427-9d42fb163883 h1:/LOix6EwCWshEuufhM8gAuCqP5jPWVMqPZn8HUQCw3Y= +github.com/filecoin-project/specs-actors v0.8.7-0.20200805174427-9d42fb163883/go.mod h1:QRihI/fadrhWzt7HH6mT32upOdDFpSYCFnr3JEI1L50= +github.com/filecoin-project/specs-storage v0.1.0/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k= github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY= github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k= +github.com/filecoin-project/storage-fsm v0.0.0-20200721113842-ab98dc7ab341/go.mod h1:1CGbd11KkHuyWPT+xwwCol1zl/jnlpiKD2L4fzKxaiI= github.com/filecoin-project/storage-fsm v0.0.0-20200730122205-d423ae90d8d4 h1:Eg7Ia3iRWKMXpS7bU8ufarQJyGsBor7eGgfrAHfn8HA= github.com/filecoin-project/storage-fsm v0.0.0-20200730122205-d423ae90d8d4/go.mod h1:1CGbd11KkHuyWPT+xwwCol1zl/jnlpiKD2L4fzKxaiI= +github.com/filecoin-project/storage-fsm v0.0.0-20200805013058-9d9ea4e6331f h1:WeMFRLMtAFqUwobouSeYj3pfgYtsSUwi3ztqDzFJMZY= +github.com/filecoin-project/storage-fsm v0.0.0-20200805013058-9d9ea4e6331f/go.mod h1:1CGbd11KkHuyWPT+xwwCol1zl/jnlpiKD2L4fzKxaiI= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNpjmVH56sVtS/RfclBAYocb4as= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ= @@ -563,10 +586,13 @@ github.com/ipfs/go-filestore v1.0.0 h1:QR7ekKH+q2AGiWDc7W2Q0qHuYSRZGUJqUn0GsegEP github.com/ipfs/go-filestore v1.0.0/go.mod h1:/XOCuNtIe2f1YPbiXdYvD0BKLA0JR1MgPiFOdcuu9SM= github.com/ipfs/go-fs-lock v0.0.1 h1:XHX8uW4jQBYWHj59XXcjg7BHlHxV9ZOYs6Y43yb7/l0= github.com/ipfs/go-fs-lock v0.0.1/go.mod h1:DNBekbboPKcxs1aukPSaOtFA3QfSdi5C855v0i9XJ8Y= +github.com/ipfs/go-graphsync v0.0.6-0.20200715142715-e2f27c4754e6/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= github.com/ipfs/go-graphsync v0.1.0 h1:RjLk7ha1tJtDXktqoxOjhvx4lDuzzIU+xQ+PEi74r3s= github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= github.com/ipfs/go-hamt-ipld v0.0.15-0.20200131012125-dd88a59d3f2e/go.mod h1:9aQJu/i/TaRDW6jqB5U217dLIDopn50wxLdHXM2CTfE= github.com/ipfs/go-hamt-ipld v0.0.15-0.20200204200533-99b8553ef242/go.mod h1:kq3Pi+UP3oHhAdKexE+kHHYRKMoFNuGero0R7q3hWGg= +github.com/ipfs/go-hamt-ipld v0.1.1-0.20200501020327-d53d20a7063e/go.mod h1:giiPqWYCnRBYpNTsJ/EX1ojldX5kTXrXYckSJQ7ko9M= +github.com/ipfs/go-hamt-ipld v0.1.1-0.20200605182717-0310ad2b0b1f/go.mod h1:phOFBB7W73N9dg1glcb1fQ9HtQFDUpeyJgatW8ns0bw= github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= @@ -670,6 +696,7 @@ github.com/ipfs/iptb v1.4.0 h1:YFYTrCkLMRwk/35IMyC6+yjoQSHTEcNcefBStLJzgvo= github.com/ipfs/iptb v1.4.0/go.mod h1:1rzHpCYtNp87/+hTxG5TfCVn/yMY3dKnLn8tBiMfdmg= github.com/ipfs/iptb-plugins v0.2.1 h1:au4HWn9/pRPbkxA08pDx2oRAs4cnbgQWgV0teYXuuGA= github.com/ipfs/iptb-plugins v0.2.1/go.mod h1:QXMbtIWZ+jRsW8a4h13qAKU7jcM7qaittO8wOsTP0Rs= +github.com/ipld/go-car v0.1.1-0.20200429200904-c222d793c339/go.mod h1:eajxljm6I8o3LitnFeVEmucwZmz7+yLSiKce9yYMefg= github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae h1:OV9dxl8iPMCOD8Vi/hvFwRh3JWPXqmkYSVxWr9JnEzM= github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae/go.mod h1:2mvxpu4dKRnuH3mj5u6KW/tmRSCcXvy/KYiJ4nC6h4c= github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e h1:ZISbJlM0urTANR9KRfRaqlBmyOj5uUtxs2r4Up9IXsA= @@ -787,12 +814,15 @@ github.com/libp2p/go-libp2p v0.6.1/go.mod h1:CTFnWXogryAHjXAKEbOf1OWY+VeAP3lDMZk github.com/libp2p/go-libp2p v0.7.0/go.mod h1:hZJf8txWeCduQRDC/WSqBGMxaTHCOYHt2xSU1ivxn0k= github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniVO7zIHGMw= github.com/libp2p/go-libp2p v0.8.1/go.mod h1:QRNH9pwdbEBpx5DTJYg+qxcVaDMAz3Ee/qDKwXujH5o= +github.com/libp2p/go-libp2p v0.8.2/go.mod h1:NQDA/F/qArMHGe0J7sDScaKjW8Jh4y/ozQqBbYJ+BnA= github.com/libp2p/go-libp2p v0.8.3/go.mod h1:EsH1A+8yoWK+L4iKcbPYu6MPluZ+CHWI9El8cTaefiM= github.com/libp2p/go-libp2p v0.9.2/go.mod h1:cunHNLDVus66Ct9iXXcjKRLdmHdFdHVe1TAnbubJQqQ= github.com/libp2p/go-libp2p v0.10.0 h1:7ooOvK1wi8eLpyTppy8TeH43UHy5uI75GAHGJxenUi0= github.com/libp2p/go-libp2p v0.10.0/go.mod h1:yBJNpb+mGJdgrwbKAKrhPU0u3ogyNFTfjJ6bdM+Q/G8= github.com/libp2p/go-libp2p v0.10.2 h1:VQOo/Pbj9Ijco9jiMYN5ImAg236IjTXfnUPJ2OvbpLM= github.com/libp2p/go-libp2p v0.10.2/go.mod h1:BYckt6lmS/oA1SlRETSPWSUulCQKiZuTVsymVMc//HQ= +github.com/libp2p/go-libp2p v0.10.3 h1:Bc8/VjmC+pICtK6xG8YgVutZvCdK0MsroWCHP+6AdFQ= +github.com/libp2p/go-libp2p v0.10.3/go.mod h1:0ER6iPSaPeQjryNgOnm9bLNpMJCYmuw54xJXsVR17eE= github.com/libp2p/go-libp2p-autonat v0.0.2/go.mod h1:fs71q5Xk+pdnKU014o2iq1RhMs9/PMaG5zXRFNnIIT4= github.com/libp2p/go-libp2p-autonat v0.0.6/go.mod h1:uZneLdOkZHro35xIhpbtTzLlgYturpu4J5+0cZK3MqE= github.com/libp2p/go-libp2p-autonat v0.1.0/go.mod h1:1tLf2yXxiE/oKGtDwPYWTSYG3PtvYlJmg7NeVtPRqH8= @@ -804,6 +834,8 @@ github.com/libp2p/go-libp2p-autonat v0.2.3 h1:w46bKK3KTOUWDe5mDYMRjJu1uryqBp8HCN github.com/libp2p/go-libp2p-autonat v0.2.3/go.mod h1:2U6bNWCNsAG9LEbwccBDQbjzQ8Krdjge1jLTE9rdoMM= github.com/libp2p/go-libp2p-autonat v0.3.1 h1:60sc3NuQz+RxEb4ZVCRp/7uPtD7gnlLcOIKYNulzSIo= github.com/libp2p/go-libp2p-autonat v0.3.1/go.mod h1:0OzOi1/cVc7UcxfOddemYD5vzEqi4fwRbnZcJGLi68U= +github.com/libp2p/go-libp2p-autonat v0.3.2 h1:OhDSwVVaq7liTaRIsFFYvsaPp0pn2yi0WazejZ4DUmo= +github.com/libp2p/go-libp2p-autonat v0.3.2/go.mod h1:0OzOi1/cVc7UcxfOddemYD5vzEqi4fwRbnZcJGLi68U= github.com/libp2p/go-libp2p-autonat-svc v0.1.0/go.mod h1:fqi8Obl/z3R4PFVLm8xFtZ6PBL9MlV/xumymRFkKq5A= github.com/libp2p/go-libp2p-blankhost v0.0.1/go.mod h1:Ibpbw/7cPPYwFb7PACIWdvxxv0t0XCCI10t7czjAjTc= github.com/libp2p/go-libp2p-blankhost v0.1.1/go.mod h1:pf2fvdLJPsC1FsVrNP3DUUvMzUts2dsLLBEpo1vW1ro= @@ -875,6 +907,7 @@ github.com/libp2p/go-libp2p-interface-connmgr v0.0.4/go.mod h1:GarlRLH0LdeWcLnYM github.com/libp2p/go-libp2p-interface-connmgr v0.0.5/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k= github.com/libp2p/go-libp2p-interface-pnet v0.0.1/go.mod h1:el9jHpQAXK5dnTpKA4yfCNBZXvrzdOU75zz+C6ryp3k= github.com/libp2p/go-libp2p-kad-dht v0.2.1/go.mod h1:k7ONOlup7HKzQ68dE6lSnp07cdxdkmnRa+6B4Fh9/w0= +github.com/libp2p/go-libp2p-kad-dht v0.8.1/go.mod h1:u3rbYbp3CSraAHD5s81CJ3hHozKTud/UOXfAgh93Gek= github.com/libp2p/go-libp2p-kad-dht v0.8.3 h1:ceK5ML6s/I8UAcw6veoNsuEHdHvfo88leU/5uWOIFWs= github.com/libp2p/go-libp2p-kad-dht v0.8.3/go.mod h1:HnYYy8taJWESkqiESd1ngb9XX/XGGsMA5G0Vj2HoSh4= github.com/libp2p/go-libp2p-kbucket v0.2.1/go.mod h1:/Rtu8tqbJ4WQ2KTCOMJhggMukOLNLNPY1EtEWWLxUvc= @@ -930,6 +963,8 @@ github.com/libp2p/go-libp2p-pubsub v0.3.2 h1:k3cJm5JW5mjaWZkobS50sJLJWaB2mBi0HW4 github.com/libp2p/go-libp2p-pubsub v0.3.2/go.mod h1:Uss7/Cfz872KggNb+doCVPHeCDmXB7z500m/R8DaAUk= github.com/libp2p/go-libp2p-pubsub v0.3.4-0.20200731161531-2b5243c72f0d h1:1kfMc74C1DZGh97VJpA5efPXWU3tmdRF/wKYbFYya/4= github.com/libp2p/go-libp2p-pubsub v0.3.4-0.20200731161531-2b5243c72f0d/go.mod h1:DTMSVmZZfXodB/pvdTGrY2eHPZ9W2ev7hzTH83OKHrI= +github.com/libp2p/go-libp2p-pubsub v0.3.4 h1:8PollxXtUvzy0DMn5XFMg/JihjaKboWyk3ML6yRW1Lk= +github.com/libp2p/go-libp2p-pubsub v0.3.4/go.mod h1:DTMSVmZZfXodB/pvdTGrY2eHPZ9W2ev7hzTH83OKHrI= github.com/libp2p/go-libp2p-pubsub-tracer v0.0.0-20200626141350-e730b32bf1e6 h1:2lH7rMlvDPSvXeOR+g7FE6aqiEwxtpxWKQL8uigk5fQ= github.com/libp2p/go-libp2p-pubsub-tracer v0.0.0-20200626141350-e730b32bf1e6/go.mod h1:8ZodgKS4qRLayfw9FDKDd9DX4C16/GMofDxSldG8QPI= github.com/libp2p/go-libp2p-quic-transport v0.1.1/go.mod h1:wqG/jzhF3Pu2NrhJEvE+IE0NTHNXslOPn9JQzyCAxzU= @@ -1468,6 +1503,7 @@ github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:X github.com/whyrusleeping/cbor-gen v0.0.0-20200206220010-03c9665e2a66/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= github.com/whyrusleeping/cbor-gen v0.0.0-20200402171437-3d27c146c105/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= github.com/whyrusleeping/cbor-gen v0.0.0-20200414195334-429a0b5e922e/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= +github.com/whyrusleeping/cbor-gen v0.0.0-20200501014322-5f9941ef88e0/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= github.com/whyrusleeping/cbor-gen v0.0.0-20200501232601-351665a6e756/go.mod h1:W5MvapuoHRP8rz4vxjwCK1pDqF1aQcWsV5PZ+AHbqdg= github.com/whyrusleeping/cbor-gen v0.0.0-20200504204219-64967432584d/go.mod h1:W5MvapuoHRP8rz4vxjwCK1pDqF1aQcWsV5PZ+AHbqdg= github.com/whyrusleeping/cbor-gen v0.0.0-20200710004633-5379fc63235d/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= diff --git a/lotus-soup/testkit/memjournal/memory.go b/lotus-soup/testkit/memjournal/memory.go new file mode 100644 index 00000000..0d316318 --- /dev/null +++ b/lotus-soup/testkit/memjournal/memory.go @@ -0,0 +1,249 @@ +package memjournal + +import ( + "context" + "sync/atomic" + + "go.uber.org/fx" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/journal" +) + +// Control messages. +type ( + clearCtrl struct{} + addObserverCtrl struct { + observer *observer + replay bool + } + rmObserverCtrl *observer + getEntriesCtrl chan []*journal.Event +) + +type MemJournal struct { + journal.EventTypeFactory + + entries []*journal.Event + index map[string]map[string][]*journal.Event + observers []observer + + incomingCh chan *journal.Event + controlCh chan interface{} + + state int32 // guarded by atomic; 0=closed, 1=running. + closed chan struct{} +} + +var _ journal.Journal = (*MemJournal)(nil) + +type observer struct { + accept map[journal.EventType]struct{} + ch chan *journal.Event +} + +func (o *observer) dispatch(entry *journal.Event) { + if o.accept == nil { + o.ch <- entry + } + if _, ok := o.accept[entry.EventType]; ok { + o.ch <- entry + } +} + +func NewMemoryJournal(lc fx.Lifecycle, disabled journal.DisabledEvents) *MemJournal { + m := &MemJournal{ + EventTypeFactory: journal.NewEventTypeFactory(disabled), + + index: make(map[string]map[string][]*journal.Event, 16), + observers: make([]observer, 0, 16), + incomingCh: make(chan *journal.Event, 256), + controlCh: make(chan interface{}, 16), + state: 1, + closed: make(chan struct{}), + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { return m.Close() }, + }) + + go m.process() + + return m +} + +func (m *MemJournal) RecordEvent(evtType journal.EventType, obj interface{}) { + if !evtType.Enabled() { + // tried to record a disabled event type, or used an invalid EventType. + return + } + + entry := &journal.Event{ + EventType: evtType, + Timestamp: build.Clock.Now(), + Data: obj, + } + + select { + case m.incomingCh <- entry: + case <-m.closed: + } +} + +func (m *MemJournal) Close() error { + if !atomic.CompareAndSwapInt32(&m.state, 1, 0) { + // already closed. + return nil + } + close(m.closed) + return nil +} + +func (m *MemJournal) Clear() { + select { + case m.controlCh <- clearCtrl{}: + case <-m.closed: + } +} + +// Observe starts observing events that are recorded in the MemJournal, and +// returns a channel where new events will be sent. When replay is true, all +// entries that have been recorded prior to the observer being registered will +// be replayed. To restrict the event types this observer will sent, use the +// include argument. If no include set is passed, the observer will receive all +// events types. +func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...journal.EventType) <-chan *journal.Event { + var acc map[journal.EventType]struct{} + if include != nil { + acc = make(map[journal.EventType]struct{}, len(include)) + for _, et := range include { + if !et.Enabled() { + // skip over disabled event type. + continue + } + acc[et] = struct{}{} + } + } + + ch := make(chan *journal.Event, 256) + o := &observer{ + accept: acc, + ch: ch, + } + + // watch the context, and fire the "remove observer" control message upon + // cancellation. + go func() { + <-ctx.Done() + select { + case m.controlCh <- rmObserverCtrl(o): + case <-m.closed: + } + }() + + select { + case m.controlCh <- addObserverCtrl{o, replay}: + case <-m.closed: + // we are already stopped. + close(ch) + } + + return ch +} + +// Entries gets a snapshot of stored entries. +func (m *MemJournal) Entries() []*journal.Event { + ch := make(chan []*journal.Event) + m.controlCh <- getEntriesCtrl(ch) + return <-ch +} + +func (m *MemJournal) process() { + processCtrlMsg := func(message interface{}) { + switch msg := message.(type) { + case addObserverCtrl: + // adding an observer. + m.observers = append(m.observers, *msg.observer) + + if msg.replay { + // replay all existing entries. + for _, e := range m.entries { + msg.observer.dispatch(e) + } + } + case rmObserverCtrl: + // removing an observer; find the observer, close its channel. + // then discard it from our list by replacing it with the last + // observer and reslicing. + for i, o := range m.observers { + if o.ch == msg.ch { + close(o.ch) + m.observers[i] = m.observers[len(m.observers)-1] + m.observers = m.observers[:len(m.observers)-1] + } + } + case clearCtrl: + m.entries = m.entries[0:0] + // carry over system and event names; there are unlikely to change; + // just reslice the entry slices, so we are not thrashing memory. + for _, events := range m.index { + for ev := range events { + events[ev] = events[ev][0:0] + } + } + case getEntriesCtrl: + cpy := make([]*journal.Event, len(m.entries)) + copy(cpy, m.entries) + msg <- cpy + close(msg) + } + } + + processClose := func() { + m.entries = nil + m.index = make(map[string]map[string][]*journal.Event, 16) + for _, o := range m.observers { + close(o.ch) + } + m.observers = nil + } + + for { + // Drain all control messages first! + select { + case msg := <-m.controlCh: + processCtrlMsg(msg) + continue + case <-m.closed: + processClose() + return + default: + } + + // Now consume and pipe messages. + select { + case entry := <-m.incomingCh: + m.entries = append(m.entries, entry) + events := m.index[entry.System] + if events == nil { + events = make(map[string][]*journal.Event, 16) + m.index[entry.System] = events + } + + entries := events[entry.Event] + events[entry.Event] = append(entries, entry) + + for _, o := range m.observers { + o.dispatch(entry) + } + + case msg := <-m.controlCh: + processCtrlMsg(msg) + continue + + case <-m.closed: + processClose() + return + } + } +} diff --git a/lotus-soup/testkit/memjournal/memory_test.go b/lotus-soup/testkit/memjournal/memory_test.go new file mode 100644 index 00000000..b0674e28 --- /dev/null +++ b/lotus-soup/testkit/memjournal/memory_test.go @@ -0,0 +1,185 @@ +package memjournal + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/journal" + + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/raulk/clock" + "github.com/stretchr/testify/require" + "go.uber.org/fx/fxtest" +) + +func TestMemJournal_AddEntry(t *testing.T) { + lc := fxtest.NewLifecycle(t) + defer lc.RequireStop() + + clk := clock.NewMock() + build.Clock = clk + + jrnl := NewMemoryJournal(lc, nil) + addEntries(jrnl, 100) + + require.Eventually(t, func() bool { return len(jrnl.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) + + entries := jrnl.Entries() + cnt := make(map[string]int, 10) + for i, e := range entries { + require.EqualValues(t, "spaceship", e.System) + require.Equal(t, store.HeadChangeEvt{ + From: types.TipSetKey{}, + FromHeight: abi.ChainEpoch(i), + To: types.TipSetKey{}, + ToHeight: abi.ChainEpoch(i), + RevertCount: i, + ApplyCount: i, + }, e.Data) + require.Equal(t, build.Clock.Now(), e.Timestamp) + cnt[e.Event]++ + } + + // we received 10 entries of each event type. + for _, c := range cnt { + require.Equal(t, 10, c) + } +} + +func TestMemJournal_Close(t *testing.T) { + lc := fxtest.NewLifecycle(t) + defer lc.RequireStop() + + jrnl := NewMemoryJournal(lc, nil) + addEntries(jrnl, 100) + + require.Eventually(t, func() bool { return len(jrnl.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) + + o1 := jrnl.Observe(context.TODO(), false) + o2 := jrnl.Observe(context.TODO(), false) + o3 := jrnl.Observe(context.TODO(), false) + + time.Sleep(500 * time.Millisecond) + + // Close the journal. + require.NoError(t, jrnl.Close()) + + time.Sleep(500 * time.Millisecond) + +NextChannel: + for _, ch := range []<-chan *journal.Event{o1, o2, o3} { + for { + select { + case _, more := <-ch: + if more { + // keep consuming + } else { + continue NextChannel + } + default: + t.Fatal("nothing more to consume, and channel is not closed") + } + } + } +} + +func TestMemJournal_Clear(t *testing.T) { + lc := fxtest.NewLifecycle(t) + defer lc.RequireStop() + + journal := NewMemoryJournal(lc, nil) + addEntries(journal, 100) + + require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) + + journal.Clear() + require.Empty(t, journal.Entries()) + require.Empty(t, journal.Entries()) + require.Empty(t, journal.Entries()) +} + +func TestMemJournal_Observe(t *testing.T) { + lc := fxtest.NewLifecycle(t) + defer lc.RequireStop() + + journal := NewMemoryJournal(lc, nil) + addEntries(journal, 100) + + require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) + + et1 := journal.RegisterEventType("spaceship", "wheezing-1") + et2 := journal.RegisterEventType("spaceship", "wheezing-2") + + o1 := journal.Observe(context.TODO(), false, et1) + o2 := journal.Observe(context.TODO(), true, et1, et2) + o3 := journal.Observe(context.TODO(), true) + + time.Sleep(1 * time.Second) + + require.Len(t, o1, 0) // no replay + require.Len(t, o2, 20) // replay with include set + require.Len(t, o3, 100) // replay with no include set (all entries) + + // add another 100 entries and assert what the observers have seen. + addEntries(journal, 100) + + require.Eventually(t, func() bool { return len(journal.Entries()) == 200 }, 1*time.Second, 100*time.Millisecond) + + // note: we're able to queue items because the observer channel buffer size is 256. + require.Len(t, o1, 10) // should have 0 old entries + 10 new entries + require.Len(t, o2, 40) // should have 20 old entries + 20 new entries + require.Len(t, o3, 200) // should have 100 old entries + 100 new entries +} + +func TestMemJournal_ObserverCancellation(t *testing.T) { + lc := fxtest.NewLifecycle(t) + defer lc.RequireStop() + + journal := NewMemoryJournal(lc, nil) + + ctx, cancel := context.WithCancel(context.TODO()) + o1 := journal.Observe(ctx, false) + o2 := journal.Observe(context.TODO(), false) + addEntries(journal, 100) + + require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) + + // all observers have received the 100 entries. + require.Len(t, o1, 100) + require.Len(t, o2, 100) + + // cancel o1's context. + cancel() + time.Sleep(500 * time.Millisecond) + + // add 50 new entries + addEntries(journal, 50) + + require.Eventually(t, func() bool { return len(journal.Entries()) == 150 }, 1*time.Second, 100*time.Millisecond) + + require.Len(t, o1, 100) // has not moved. + require.Len(t, o2, 150) // should have 100 old entries + 50 new entries +} + +func addEntries(journal *MemJournal, count int) { + for i := 0; i < count; i++ { + eventIdx := i % 10 + + // RegisterEventType is not _really_ intended to be used this way (on every write). + et := journal.RegisterEventType("spaceship", fmt.Sprintf("wheezing-%d", eventIdx)) + journal.RecordEvent(et, store.HeadChangeEvt{ + From: types.TipSetKey{}, + FromHeight: abi.ChainEpoch(i), + To: types.TipSetKey{}, + ToHeight: abi.ChainEpoch(i), + RevertCount: i, + ApplyCount: i, + }) + } +}