diff --git a/adapter/interface.go b/adapter/interface.go index dd7cddc..5db7ad9 100644 --- a/adapter/interface.go +++ b/adapter/interface.go @@ -224,6 +224,7 @@ type IMongodb interface { DeleteOne(query interface{}) error InsertAll(docs interface{}) error PipeAll(query interface{}, desc interface{}) error + PipeOne(pipeline interface{}, result interface{}) error UpdateOrInsert(query interface{}, doc interface{}) error FindOne(query interface{}, doc interface{}, options ...bson.M) error FindAll(query interface{}, doc interface{}, options ...bson.M) error diff --git a/adapter/mongodb.go b/adapter/mongodb.go index b0f2825..0672505 100644 --- a/adapter/mongodb.go +++ b/adapter/mongodb.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/globalsign/mgo" "github.com/globalsign/mgo/bson" "github.com/qiniu/qmgo" "go.mongodb.org/mongo-driver/mongo" @@ -291,6 +292,29 @@ func (m *Mongodb) UpdateOrInsert(query interface{}, doc interface{}) error { return err } +// PipeOne execute aggregation queries and get the first item from result set. +// param pipeline must be a slice, such as []bson.M, +// param result is a pointer to interface{}, map, bson.M or bson compatible struct. +// for example: +// +// pipeline := []bson.M{ +// bson.M{"$match": bson.M{"status":"A"}}, +// bson.M{"$group": bson.M{"_id":"$field1", "total":"$field2"}}, +// } +// m.PipeOne(pipeline, &result) +func (m *Mongodb) PipeOne(pipeline interface{}, result interface{}) error { + profile := "Mongo.PipeOne" + m.Context().ProfileStart(profile) + defer m.Context().ProfileStop(profile) + col := m.client.MClient.Database(m.db).Collection(m.coll) + e := col.Aggregate(context.Background(), pipeline).One(result) + if e != nil && e != mgo.ErrNotFound { + m.Context().Error(profile + " error, " + e.Error()) + } + + return e +} + func (m *Mongodb) PipeAll(query interface{}, doc interface{}) error { profile := "mongodb.PipeAll" m.Context().ProfileStart(profile)