diff --git a/adapter/interface.go b/adapter/interface.go index fa29732..43f7e50 100644 --- a/adapter/interface.go +++ b/adapter/interface.go @@ -8,6 +8,13 @@ import ( "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" + "github.com/qiniu/qmgo" + opts "github.com/qiniu/qmgo/options" + "github.com/streadway/amqp" + "go.mongodb.org/mongo-driver/mongo" + "gorm.io/gorm" + "gorm.io/gorm/clause" + "github.com/pinguo/pgo2/client/es" "github.com/pinguo/pgo2/client/maxmind" "github.com/pinguo/pgo2/client/memcache" @@ -17,12 +24,6 @@ import ( "github.com/pinguo/pgo2/client/redis" "github.com/pinguo/pgo2/iface" "github.com/pinguo/pgo2/value" - "github.com/qiniu/qmgo" - opts "github.com/qiniu/qmgo/options" - "github.com/streadway/amqp" - "go.mongodb.org/mongo-driver/mongo" - "gorm.io/gorm" - "gorm.io/gorm/clause" ) type IMemory interface { @@ -217,6 +218,14 @@ type IOrm interface { type IMongodb interface { iface.IObject + Count(query interface{}) (int, error) + DeleteAll(query interface{}) error + DeleteOne(query interface{}) error + InsertAll(docs interface{}) error + PipeAll(query interface{}, desc interface{}) error + UpdateOrInsert(query interface{}, doc interface{}) error + FindOne(query interface{}, doc interface{}, options ...bson.M) error + FindAll(query interface{}, doc interface{}, options ...bson.M) error InsertOne(doc interface{}, opts ...opts.InsertOneOptions) (result *qmgo.InsertOneResult, err error) InsertOneCtx(ctx context.Context, doc interface{}, opts ...opts.InsertOneOptions) (result *qmgo.InsertOneResult, err error) InsertMany(docs interface{}, opts ...opts.InsertManyOptions) (result *qmgo.InsertManyResult, err error) @@ -242,7 +251,7 @@ type IMongodb interface { Aggregate(pipeline interface{}) IMongodbAggregate AggregateCtx(ctx context.Context, pipeline interface{}) IMongodbAggregate EnsureIndexes(uniques []string, indexes []string) (err error) - EnsureIndexesCtx(ctx context.Context, uniques []string, indexes []string)(err error) + EnsureIndexesCtx(ctx context.Context, uniques []string, indexes []string) (err error) CreateIndexes(indexes []opts.IndexModel) (err error) CreateIndexesCtx(ctx context.Context, indexes []opts.IndexModel) (err error) CreateOneIndex(index opts.IndexModel) error @@ -257,7 +266,7 @@ type IMongodb interface { GetCollectionName() string Find(filter interface{}, options ...opts.FindOptions) IMongodbQuery FindCtx(ctx context.Context, filter interface{}, options ...opts.FindOptions) IMongodbQuery - GetClient() *mongodb.Client + GetClient() *mongodb.Client Session() (*qmgo.Session, error) DoTransaction(ctx context.Context, callback func(sessCtx context.Context) (interface{}, error)) (interface{}, error) } diff --git a/adapter/mongodb.go b/adapter/mongodb.go index 1537917..18fb8b6 100644 --- a/adapter/mongodb.go +++ b/adapter/mongodb.go @@ -2,11 +2,15 @@ package adapter import ( "context" + "fmt" - "github.com/pinguo/pgo2" - "github.com/pinguo/pgo2/client/mongodb" + "github.com/globalsign/mgo/bson" "github.com/qiniu/qmgo" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/pinguo/pgo2" + "github.com/pinguo/pgo2/client/mongodb" opts "github.com/qiniu/qmgo/options" ) @@ -48,7 +52,7 @@ type Mongodb struct { coll string } -// GetObjBox fetch is performed automatically +// GetObjBox fetch is performed automatically func (m *Mongodb) Prepare(db, coll string, componentId ...string) { if db == "" || coll == "" { panic("db and coll can not empty") @@ -79,9 +83,9 @@ func (m *Mongodb) InsertOne(doc interface{}, opts ...opts.InsertOneOptions) (res profile := "mongodb.InsertOne" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() - //ctx := context.Background() + // ctx := context.Background() return m.client.MClient.Database(m.db).Collection(m.coll).InsertOne(ctx, doc, opts...) } @@ -100,7 +104,7 @@ func (m *Mongodb) InsertMany(docs interface{}, opts ...opts.InsertManyOptions) ( profile := "mongodb.InsertMany" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).InsertMany(ctx, docs, opts...) @@ -124,7 +128,7 @@ func (m *Mongodb) Upsert(filter interface{}, replacement interface{}, opts ...op profile := "mongodb.Upsert" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).Upsert(ctx, filter, replacement, opts...) @@ -146,7 +150,7 @@ func (m *Mongodb) UpsertId(id interface{}, replacement interface{}, opts ...opts profile := "mongodb.UpsertId" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).UpsertId(ctx, id, replacement, opts...) @@ -160,13 +164,167 @@ func (m *Mongodb) UpsertIdCtx(ctx context.Context, id interface{}, replacement i return m.client.MClient.Database(m.db).Collection(m.coll).Upsert(ctx, id, replacement, opts...) } +func handleOptions(query qmgo.QueryI, options ...bson.M) (qmgo.QueryI, error) { + opts := make(map[string]interface{}) + for _, opt := range options { + if opt == nil { + continue + } + for k, v := range opt { + opts[k] = v + } + } + if sort, ok := opts["sort"]; ok { + switch sort.(type) { + case string: + query.Sort(sort.(string)) + case []string: + query.Sort(sort.([]string)...) + default: + return nil, fmt.Errorf("invalid mongo sort:%#v", sort) + } + } + number := func(name string) (int64, error) { + rl := int64(0) + v, ok := opts[name] + if !ok { + return 0, nil + } + switch v.(type) { + case int: + rl = int64(v.(int)) + case int32: + rl = int64(v.(int32)) + case int64: + rl = v.(int64) + default: + return 0, fmt.Errorf("invalid mongo limit: %#v", v) + } + return rl, nil + } + if limit, err := number("limit"); err != nil { + return nil, err + } else if limit > 0 { + query.Limit(limit) + } + + if skip, err := number("skip"); err != nil { + return nil, err + } else if skip > 0 { + query.Skip(skip) + } + return query, nil +} + +// DeleteAll delete all doc +func (m *Mongodb) DeleteAll(filter interface{}) error { + profile := "mongodb.DeleteAll" + m.Context().ProfileStart(profile) + defer m.Context().ProfileStop(profile) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) + defer cFunc() + col := m.client.MClient.Database(m.db).Collection(m.coll) + _, err := col.RemoveAll(ctx, filter) + return err +} + +// Count count docs +func (m *Mongodb) Count(filter interface{}) (int, error) { + profile := "mongodb.Count" + m.Context().ProfileStart(profile) + defer m.Context().ProfileStop(profile) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) + defer cFunc() + col := m.client.MClient.Database(m.db).Collection(m.coll) + n, err := col.Find(ctx, filter).Count() + return int(n), err +} + +// DeleteOne delete one, 暂时没有one +func (m *Mongodb) DeleteOne(filter interface{}) error { + profile := "mongodb.DeleteOne" + m.Context().ProfileStart(profile) + defer m.Context().ProfileStop(profile) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) + defer cFunc() + col := m.client.MClient.Database(m.db).Collection(m.coll) + return col.Remove(ctx, filter, opts.RemoveOptions{ + DeleteOptions: &options.DeleteOptions{}, + }) +} + +func (m *Mongodb) InsertAll(docs interface{}) error { + profile := "mongodb.InsertAll" + m.Context().ProfileStart(profile) + defer m.Context().ProfileStop(profile) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) + defer cFunc() + col := m.client.MClient.Database(m.db).Collection(m.coll) + _, err := col.InsertMany(ctx, docs) + return err +} + +func (m *Mongodb) UpdateOrInsert(query interface{}, doc interface{}) error { + profile := "mongodb.UpdateOrInsert" + m.Context().ProfileStart(profile) + defer m.Context().ProfileStop(profile) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) + defer cFunc() + col := m.client.MClient.Database(m.db).Collection(m.coll) + _, err := col.Upsert(ctx, query, doc) + return err +} + +func (m *Mongodb) PipeAll(query interface{}, doc interface{}) error { + profile := "mongodb.PipeAll" + m.Context().ProfileStart(profile) + defer m.Context().ProfileStop(profile) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) + defer cFunc() + col := m.client.MClient.Database(m.db).Collection(m.coll) + ag := col.Aggregate(ctx, query) + return ag.All(doc) +} + +// FindAll query all doc +func (m *Mongodb) FindAll(filter interface{}, doc interface{}, options ...bson.M) (err error) { + profile := "mongodb.FindAll" + m.Context().ProfileStart(profile) + defer m.Context().ProfileStop(profile) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) + defer cFunc() + col := m.client.MClient.Database(m.db).Collection(m.coll) + findOpts := opts.FindOptions{} + cursor, err := handleOptions(col.Find(ctx, filter, findOpts), options...) + if err != nil { + return err + } + return cursor.All(doc) +} + +// FindOne query one doc +func (m *Mongodb) FindOne(filter interface{}, doc interface{}, options ...bson.M) (err error) { + profile := "mongodb.FindOne" + m.Context().ProfileStart(profile) + defer m.Context().ProfileStop(profile) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) + defer cFunc() + col := m.client.MClient.Database(m.db).Collection(m.coll) + findOpts := opts.FindOptions{} + cursor, err := handleOptions(col.Find(ctx, filter, findOpts), options...) + if err != nil { + return err + } + return cursor.One(doc) +} + // UpdateOne executes an update command to update at most one document in the collection. // Reference: https://docs.mongodb.com/manual/reference/operator/update/ func (m *Mongodb) UpdateOne(filter interface{}, update interface{}, opts ...opts.UpdateOptions) (err error) { profile := "mongodb.UpdateOne" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).UpdateOne(ctx, filter, update, opts...) @@ -186,7 +344,7 @@ func (m *Mongodb) UpdateId(id interface{}, update interface{}, opts ...opts.Upda profile := "mongodb.UpdateId" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).UpdateId(ctx, id, update, opts...) @@ -207,7 +365,7 @@ func (m *Mongodb) UpdateAll(filter interface{}, update interface{}, opts ...opts profile := "mongodb.UpdateAll" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).UpdateAll(ctx, filter, update, opts...) @@ -228,7 +386,7 @@ func (m *Mongodb) ReplaceOne(filter interface{}, doc interface{}, opts ...opts.R profile := "mongodb.ReplaceOne" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).ReplaceOne(ctx, filter, doc, opts...) @@ -249,7 +407,7 @@ func (m *Mongodb) Remove(filter interface{}, opts ...opts.RemoveOptions) (err er profile := "mongodb.Remove" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).Remove(ctx, filter, opts...) @@ -268,7 +426,7 @@ func (m *Mongodb) RemoveId(id interface{}, opts ...opts.RemoveOptions) (err erro profile := "mongodb.RemoveId" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).Remove(ctx, id, opts...) @@ -286,7 +444,7 @@ func (m *Mongodb) RemoveAll(filter interface{}, opts ...opts.RemoveOptions) (res profile := "mongodb.RemoveAll" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).RemoveAll(ctx, filter, opts...) @@ -302,15 +460,15 @@ func (m *Mongodb) RemoveAllCtx(ctx context.Context, filter interface{}, opts ... // Aggregate executes an aggregate command against the collection and returns a AggregateI to get resulting documents. func (m *Mongodb) Aggregate(pipeline interface{}) IMongodbAggregate { - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.ReadTimeout()) - aggregate:=m.client.MClient.Database(m.db).Collection(m.coll).Aggregate(ctx, pipeline) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.ReadTimeout()) + aggregate := m.client.MClient.Database(m.db).Collection(m.coll).Aggregate(ctx, pipeline) return m.GetObjBoxCtx(m.Context(), MongodbCAggregate, aggregate, cFunc).(IMongodbAggregate) } func (m *Mongodb) AggregateCtx(ctx context.Context, pipeline interface{}) IMongodbAggregate { - aggregate:=m.client.MClient.Database(m.db).Collection(m.coll).Aggregate(ctx, pipeline) + aggregate := m.client.MClient.Database(m.db).Collection(m.coll).Aggregate(ctx, pipeline) return m.GetObjBoxCtx(m.Context(), MongodbCAggregate, aggregate).(IMongodbAggregate) } @@ -325,7 +483,7 @@ func (m *Mongodb) EnsureIndexes(uniques []string, indexes []string) (err error) profile := "mongodb.EnsureIndexes" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).EnsureIndexes(ctx, uniques, indexes) @@ -346,7 +504,7 @@ func (m *Mongodb) CreateIndexes(indexes []opts.IndexModel) (err error) { profile := "mongodb.CreateIndexes" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).CreateIndexes(ctx, indexes) @@ -367,7 +525,7 @@ func (m *Mongodb) CreateOneIndex(index opts.IndexModel) error { profile := "mongodb.CreateOneIndex" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).CreateOneIndex(ctx, index) @@ -387,7 +545,7 @@ func (m *Mongodb) DropAllIndexes() (err error) { profile := "mongodb.DropAllIndexes" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).DropAllIndexes(ctx) @@ -408,7 +566,7 @@ func (m *Mongodb) DropIndex(indexes []string) error { profile := "mongodb.DropIndex" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).DropIndex(ctx, indexes) @@ -428,7 +586,7 @@ func (m *Mongodb) DropCollection() error { profile := "mongodb.DropCollection" m.Context().ProfileStart(profile) defer m.Context().ProfileStop(profile) - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.WriteTimeout()) + ctx, cFunc := context.WithTimeout(context.Background(), m.client.WriteTimeout()) defer cFunc() return m.client.MClient.Database(m.db).Collection(m.coll).DropCollection(ctx) @@ -463,8 +621,8 @@ func (m *Mongodb) DoTransaction(ctx context.Context, callback func(sessCtx conte } func (m *Mongodb) Find(filter interface{}, options ...opts.FindOptions) IMongodbQuery { - ctx,cFunc:=context.WithTimeout(context.Background(),m.client.ReadTimeout()) -// defer cFunc() + ctx, cFunc := context.WithTimeout(context.Background(), m.client.ReadTimeout()) + // defer cFunc() query := m.client.MClient.Database(m.db).Collection(m.coll).Find(ctx, filter, options...) return m.GetObjBoxCtx(m.Context(), MongodbCQueryClass, query, cFunc).(*CQuery) @@ -481,13 +639,11 @@ type CQuery struct { ctxCancel context.CancelFunc } -func (c *CQuery) Prepare(q qmgo.QueryI,dftCtxCancel ...context.CancelFunc) { +func (c *CQuery) Prepare(q qmgo.QueryI, dftCtxCancel ...context.CancelFunc) { c.QueryI = q if len(dftCtxCancel) > 0 { c.ctxCancel = dftCtxCancel[0] } - - } func (c *CQuery) Sort(fields ...string) qmgo.QueryI { @@ -576,7 +732,7 @@ type CAggregate struct { ctxCancel context.CancelFunc } -func (c *CAggregate) Prepare(q qmgo.AggregateI,dftCtxCancel ...context.CancelFunc) { +func (c *CAggregate) Prepare(q qmgo.AggregateI, dftCtxCancel ...context.CancelFunc) { c.AggregateI = q if len(dftCtxCancel) > 0 { c.ctxCancel = dftCtxCancel[0] @@ -584,7 +740,7 @@ func (c *CAggregate) Prepare(q qmgo.AggregateI,dftCtxCancel ...context.CancelFun } -func (c *CAggregate) All(results interface{}) error{ +func (c *CAggregate) All(results interface{}) error { profile := "mongodb.aggregate.All" c.Context().ProfileStart(profile) defer c.Context().ProfileStop(profile) @@ -595,7 +751,7 @@ func (c *CAggregate) All(results interface{}) error{ return c.AggregateI.All(results) } -func (c *CAggregate) One(results interface{}) error{ +func (c *CAggregate) One(results interface{}) error { profile := "mongodb.aggregate.One" c.Context().ProfileStart(profile) defer c.Context().ProfileStop(profile) diff --git a/go.mod b/go.mod index 59112d2..e74a663 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,10 @@ require ( github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.3.2 github.com/oschwald/maxminddb-golang v1.5.0 - github.com/qiniu/qmgo v0.9.1 + github.com/qiniu/qmgo v1.1.8 github.com/streadway/amqp v1.0.0 - go.mongodb.org/mongo-driver v1.4.0 + github.com/stretchr/objx v0.1.1 // indirect + go.mongodb.org/mongo-driver v1.11.6 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v2 v2.2.7 gorm.io/driver/mysql v1.0.3