Skip to content

Commit

Permalink
Refactored subscriptions. Updated tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Paolo Scanferla committed May 13, 2014
1 parent cded01e commit f8808c4
Show file tree
Hide file tree
Showing 17 changed files with 1,159 additions and 905 deletions.
115 changes: 70 additions & 45 deletions dist/asteroid.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,15 @@ Asteroid.prototype._init = function () {
// Emit the connected event
self._emit("connected");
});
self.ddp.on("reconnected", function () {
// Upon reconnection, try resuming the login
// Save the pormise it returns
self.resumeLoginPromise = self._tryResumeLogin();
// Re-establish all previously established (and still active) subscriptions
self._reEstablishSubscriptions();
// Emit the reconnected event
self._emit("reconnected");
});
self.ddp.on("added", function (data) {
self._onAdded(data);
});
Expand Down Expand Up @@ -244,52 +253,7 @@ Asteroid.prototype._onChanged = function (data) {



///////////////////////////////////////
// Subscribe and unsubscribe methods //
///////////////////////////////////////

Asteroid.prototype.subscribe = function (name /* , param1, param2, ... */) {
// Assert arguments type
must.beString(name);
// If we're already subscribed, unsubscribe before re-subscribing
var subPromise = this.subscriptions[name];
if (subPromise && subPromise.isFulfilled()) {
var subId = subPromise.inspect().value;
this.unsubscribe(subId);
}
// If the promise is pending, return it
if (subPromise && subPromise.isPending()) {
return subPromise;
}
// Init the promise that will be returned
var deferred = Q.defer();
// Keep a reference to the subscription
this.subscriptions[name] = deferred.promise;
// Get the paramteres for the subscription
var params = Array.prototype.slice.call(arguments, 1);
// Subscribe via DDP
this.ddp.sub(name, params, function (err, id) {
// This is the onReady/onNoSub callback
if (err) {
// Reject the promise if the server answered nosub
deferred.reject(err);
} else {
// Resolve the promise if the server answered ready
deferred.resolve(id);
}
});
// Return the promise
return this.subscriptions[name];
};

Asteroid.prototype.unsubscribe = function (id) {
// Assert arguments type
must.beString(id);
// Just send a ddp unsub message. We don't care about
// the response because the server doesn't give any
// meaningful response
this.ddp.unsub(id);
};



Expand Down Expand Up @@ -896,6 +860,67 @@ Set.prototype.toHash = function () {

Asteroid.Set = Set;

////////////////////////
// Subscription class //
////////////////////////

var Subscription = function (name, params, asteroid) {
this._name = name;
this._params = params;
this._asteroid = asteroid;
// Subscription promises
this._ready = Q.defer();
this.ready = this._ready.promise;
// Subscribe via DDP
var or = this._onReady.bind(this);
var os = this._onStop.bind(this);
var oe = this._onError.bind(this);
this.id = asteroid.ddp.sub(name, params, or, os, oe);
};
Subscription.constructor = Subscription;

Subscription.prototype.stop = function () {
this._asteroid.ddp.unsub(this.id);
};

Subscription.prototype._onReady = function () {
this._ready.resolve();
};

Subscription.prototype._onStop = function () {
delete this._asteroid.subscriptions[this.id];
};

Subscription.prototype._onError = function (err) {
if (this.ready.isPending()) {
this._ready.reject(err);
}
delete this._asteroid.subscriptions[this.id];
};



//////////////////////
// Subscribe method //
//////////////////////

Asteroid.prototype.subscribe = function (name /* , param1, param2, ... */) {
// Assert arguments type
must.beString(name);
// Collect arguments into array
var params = Array.prototype.slice.call(arguments, 1);
var sub = new Subscription(name, params, this);
this.subscriptions[sub.id] = sub;
return sub;
};

Asteroid.prototype._reEstablishSubscriptions = function () {
var subs = this.subscriptions;
for (var id in subs) {
subs[id] = new Subscription(subs[id]._name, subs[id]._params, this);
}
};

return Asteroid;

}));
2 changes: 1 addition & 1 deletion dist/asteroid.min.js

Large diffs are not rendered by default.

54 changes: 9 additions & 45 deletions src/asteroid.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ Asteroid.prototype._init = function () {
// Emit the connected event
self._emit("connected");
});
self.ddp.on("reconnected", function () {
// Upon reconnection, try resuming the login
// Save the pormise it returns
self.resumeLoginPromise = self._tryResumeLogin();
// Re-establish all previously established (and still active) subscriptions
self._reEstablishSubscriptions();
// Emit the reconnected event
self._emit("reconnected");
});
self.ddp.on("added", function (data) {
self._onAdded(data);
});
Expand Down Expand Up @@ -129,52 +138,7 @@ Asteroid.prototype._onChanged = function (data) {



///////////////////////////////////////
// Subscribe and unsubscribe methods //
///////////////////////////////////////

Asteroid.prototype.subscribe = function (name /* , param1, param2, ... */) {
// Assert arguments type
must.beString(name);
// If we're already subscribed, unsubscribe before re-subscribing
var subPromise = this.subscriptions[name];
if (subPromise && subPromise.isFulfilled()) {
var subId = subPromise.inspect().value;
this.unsubscribe(subId);
}
// If the promise is pending, return it
if (subPromise && subPromise.isPending()) {
return subPromise;
}
// Init the promise that will be returned
var deferred = Q.defer();
// Keep a reference to the subscription
this.subscriptions[name] = deferred.promise;
// Get the paramteres for the subscription
var params = Array.prototype.slice.call(arguments, 1);
// Subscribe via DDP
this.ddp.sub(name, params, function (err, id) {
// This is the onReady/onNoSub callback
if (err) {
// Reject the promise if the server answered nosub
deferred.reject(err);
} else {
// Resolve the promise if the server answered ready
deferred.resolve(id);
}
});
// Return the promise
return this.subscriptions[name];
};

Asteroid.prototype.unsubscribe = function (id) {
// Assert arguments type
must.beString(id);
// Just send a ddp unsub message. We don't care about
// the response because the server doesn't give any
// meaningful response
this.ddp.unsub(id);
};



Expand Down
60 changes: 60 additions & 0 deletions src/subscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
////////////////////////
// Subscription class //
////////////////////////

var Subscription = function (name, params, asteroid) {
this._name = name;
this._params = params;
this._asteroid = asteroid;
// Subscription promises
this._ready = Q.defer();
this.ready = this._ready.promise;
// Subscribe via DDP
var or = this._onReady.bind(this);
var os = this._onStop.bind(this);
var oe = this._onError.bind(this);
this.id = asteroid.ddp.sub(name, params, or, os, oe);
};
Subscription.constructor = Subscription;

Subscription.prototype.stop = function () {
this._asteroid.ddp.unsub(this.id);
};

Subscription.prototype._onReady = function () {
this._ready.resolve();
};

Subscription.prototype._onStop = function () {
delete this._asteroid.subscriptions[this.id];
};

Subscription.prototype._onError = function (err) {
if (this.ready.isPending()) {
this._ready.reject(err);
}
delete this._asteroid.subscriptions[this.id];
};



//////////////////////
// Subscribe method //
//////////////////////

Asteroid.prototype.subscribe = function (name /* , param1, param2, ... */) {
// Assert arguments type
must.beString(name);
// Collect arguments into array
var params = Array.prototype.slice.call(arguments, 1);
var sub = new Subscription(name, params, this);
this.subscriptions[sub.id] = sub;
return sub;
};

Asteroid.prototype._reEstablishSubscriptions = function () {
var subs = this.subscriptions;
for (var id in subs) {
subs[id] = new Subscription(subs[id]._name, subs[id]._params, this);
}
};
Loading

0 comments on commit f8808c4

Please sign in to comment.