diff --git a/README.md b/README.md index 7246c69..3542181 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ node-qpid ============ -A Node.js native wrapper around Apache Qpid, specifically the AMQP 1.0 Proton C API +A Node.js native wrapper around Apache Qpid, specifically the AMQP 1.0 Proton C API. ## Usage @@ -43,11 +43,42 @@ The module has been preliminarily tested against: ## Installation -It expects you to already have version 0.3 of the [qpid-proton library](http://qpid.apache.org/proton/) installed on your (Linux) system. +This module expects that you have the following installed on your system: +* [qpid-proton library](http://qpid.apache.org/proton/) version 0.7 +* nodejs-dev +* necessary build tools +Building qpid-proton is fairly involved, so to ease that burden, grabbing and installing +the binaries from debian sid is a faster (and probably less error prone) approach. There +is a PPA for qpid/released, however the libraries there are not compiled against OpenSSL +and SSL support is needed for connecting to Service Bus using AMQP. + +For Ubuntu 14.04, you can set up all the necessary requirements as follows: + +```bash +# wget http://ftp.us.debian.org/debian/pool/main/q/qpid-proton/libqpid-proton2-dev_0.7-1_amd64.deb +# wget http://ftp.us.debian.org/debian/pool/main/q/qpid-proton/libqpid-proton2_0.7-1_amd64.deb +# sudo dpkg -i libqpid-proton2-dev_0.7-1_amd64.deb libqpid-proton2_0.7-1_amd64.deb +# sudo apt-get install build-essential git +# sudo apt-get install nodejs-legacy nodejs-dev nodejs npm +``` + +Until this module is committed back to main branch and published as an npm module, to build and install it, do the following: + +```bash +# mkdir tmp +# cd tmp +# git clone https://github.com/jmspring/node-qpid.git +# cd node-qpid +# sudo npm install -g . +``` + +At this point, you are ready to use the module. ## Issues +Grabbing binaries directly from debian sid for Ubuntu is a little silly, but for now it works. + It's still rough around the edges and not ready for prime time, but pull requests are welcomed! ## Acknowledgements diff --git a/binding.gyp b/binding.gyp index 0968737..9ef2d17 100644 --- a/binding.gyp +++ b/binding.gyp @@ -2,10 +2,50 @@ "targets": [ { "target_name": "cproton", - "sources": [ "src/cproton.cc", "src/messenger.cc" ], - "link_settings" : { - "libraries": [ '-lqpid-proton' ] - } + "type": "loadable_module", + "sources": [ "src/cproton.cc", "src/sending.cc", "src/messenger.cc", "src/protondata.cc" ], + + 'conditions': [ + ['OS=="linux"', { + "link_settings" : { + "libraries": [ + '-L/usr/local/qpid-proton/lib', + '-lqpid-proton' + ], + }, + "include_dirs": [ + '/usr/include/nodejs/src', + '/usr/include/nodejs/deps/uv/include/', + '/usr/include/nodejs/deps/v8/include/', + '/usr/local/qpid-proton/include' + ], + "cflags": [ + "-fPIC" + ] + }], + ['OS=="mac"', { + 'defines!': [ + 'PLATFORM="mac"', + ], + 'defines': [ + # we need to use node's preferred "darwin" rather than gyp's preferred "mac" + 'PLATFORM="darwin"', + ], + "link_settings" : { + "libraries": [ + '-lqpid-proton', + '-L/usr/local/qpid-proton/lib64' + ], + }, + "include_dirs": [ + '/usr/local/qpid-proton/include', + '/usr/local/node/include/node', + ], + "cflags": [ + "-fPIC" + ] + }] + ] } ] } diff --git a/examples/recv.js b/examples/recv.js index 2409cea..2e0a566 100644 --- a/examples/recv.js +++ b/examples/recv.js @@ -22,4 +22,4 @@ m.on('message', function(message) { console.log(message.body); }); -m.subscribe(optimist.argv.address).receive(); +m.subscribe(optimist.argv.address, {}).receive(); diff --git a/examples/send.js b/examples/send.js index 0bbe194..9f00dcd 100644 --- a/examples/send.js +++ b/examples/send.js @@ -1,4 +1,7 @@ var Messenger = require('..').proton.Messenger; +var crypto = require('crypto'); +var async = require('async'); + var optimist = require('optimist') .options('a', { alias : 'address', @@ -20,8 +23,56 @@ m.on('error', function(err) { console.log("Error: " + err.message); }); -m.send({address: optimist.argv.address, body: message}, function(err) { - if (err) { - console.log("Error sending message: " + err.message); +var messages = {}; +var size = 128; +var count = 10000; +for(var i = 0; i < count; i++) { + messages[i] = { + id: i, + content: new Buffer(crypto.randomBytes(size)).toString('base64') + }; +} +console.log("Message size -- " + messages[0].content.length); + +var processed = 0; +function watcher() { + if(processed == count) { + console.log("Total time for %d messages: %d, errors: %d", count, end - start, errors); + } else { + console.log("%d of %d processed, errors: %d", processed, count, errors); + setTimeout(watcher, 5000); } -}); +} + +setTimeout(watcher, 5000); + +var sent = count; +var end; +var start = new Date().getTime(); +var errors = 0; +for(var i = 0; i < count; i++) { + function send_n(n) { + m.send({ address: optimist.argv.address, body: messages[n].content }, function(err, val) { + if(!err) { + end = new Date().getTime(); + } else { + if(err == "aborted") { + m.send(val, function(err, val) { + if(err) { + errors++; + console.log("ERR -- " + n + " : " + err); + } else { + end = new Date().getTime(); + } + }); + } else { + errors++; + console.log("ERR -- " + err); + } + } + processed++; + }); + } + send_n(i); +} +console.log("yup"); \ No newline at end of file diff --git a/package.json b/package.json index d341984..32d460e 100644 --- a/package.json +++ b/package.json @@ -9,10 +9,12 @@ "url": "https://github.com/pofallon/node-qpid.git" }, "devDependencies": { - "optimist": "*" + "optimist": "*", + "nconf": "*", + "async": "*" }, "engine": ">= 0.6.13 && < 0.11.0", - "version": "0.0.1", + "version": "0.0.3", "files": [ "index.js", "binding.gyp", diff --git a/src/async.h b/src/async.h index 424075a..af326af 100644 --- a/src/async.h +++ b/src/async.h @@ -16,7 +16,7 @@ template class Async { protected: uv_async_t watcher; - NODE_CPROTON_MUTEX_t + NODE_CPROTON_MUTEX_t(mutex); std::vector data; Callback callback; public: @@ -26,7 +26,7 @@ template class Async { Async(Parent* parent_, Callback cb_) : callback(cb_), parent(parent_) { watcher.data = this; - NODE_CPROTON_MUTEX_INIT + NODE_CPROTON_MUTEX_INIT(mutex) uv_async_init(uv_default_loop(), &watcher, listener); } @@ -84,7 +84,7 @@ template class Async { } ~Async() { - NODE_CPROTON_MUTEX_DESTROY + NODE_CPROTON_MUTEX_DESTROY(mutex) } }; diff --git a/src/macros.h b/src/macros.h index 43d87d9..c407430 100644 --- a/src/macros.h +++ b/src/macros.h @@ -48,6 +48,14 @@ } \ String::Utf8Value var(args[i]->ToString()); +#define REQUIRE_ARGUMENT_ARRAY(i, var) \ + if (args.Length() <= (i) || !args[i]->IsArray()) { \ + return ThrowException(Exception::TypeError( \ + String::New("Argument " #i " must be an array")) \ + ); \ + } \ + Local var = Local::Cast(args[i]) + /* Would really like to know how to write this... :-( #define OPTIONAL_ARGUMENT_STRING(i, var) \ */ @@ -63,6 +71,13 @@ var = Local::Cast(args[i]); \ } +#define OPTIONAL_ARGUMENT_OBJECT(i, var) \ + if (args.Length() <= (i) || !args[i]->IsObject()) { \ + return ThrowException(Exception::TypeError( \ + String::New("Argument " #i " must be an object")) \ + ); \ + } \ + Local var = args[i]->ToObject(); #define OPTIONAL_ARGUMENT_INTEGER(i, var, default) \ int var; \ diff --git a/src/messenger.cc b/src/messenger.cc index 806218f..d7fdcd4 100644 --- a/src/messenger.cc +++ b/src/messenger.cc @@ -2,17 +2,35 @@ #include #include #include +#include +#include +#include +#include + +#include +#include +#include + #include "messenger.h" #include "async.h" - using namespace v8; using namespace node; using namespace std; -Messenger::Messenger() { }; - -// Messenger::~Messenger() { }; +Messenger::Messenger() { + NODE_CPROTON_MUTEX_INIT(mutex); + messageSender = new MessageSender(this); + messageSettler = new MessageSettler(this); +}; + +Messenger::~Messenger() { + pn_messenger_stop(messenger); + pn_messenger_stop(receiver); + delete(messageSender); + delete(messageSettler); + NODE_CPROTON_MUTEX_DESTROY(mutex); +} Persistent Messenger::constructor_template; @@ -29,6 +47,7 @@ void Messenger::Init(Handle target) { NODE_SET_PROTOTYPE_METHOD(constructor_template, "subscribe", Subscribe); NODE_SET_PROTOTYPE_METHOD(constructor_template, "receive", Receive); NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", Stop); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "addsourcefilter", AddSourceFilter); target->Set(String::NewSymbol("Messenger"), constructor_template->GetFunction()); @@ -41,13 +60,15 @@ Handle Messenger::New(const Arguments& args) { Messenger* msgr = new Messenger(); pn_messenger_t* messenger = pn_messenger(NULL); + pn_messenger_set_blocking(messenger, false); pn_messenger_start(messenger); msgr->messenger = messenger; - // Temporary fix - pn_messenger_set_outgoing_window(msgr->messenger, 1); + // Temporary fix -- should tune this + pn_messenger_set_outgoing_window(msgr->messenger, 50); pn_messenger_t* receiver = pn_messenger(NULL); + pn_messenger_set_incoming_window(receiver, 1); pn_messenger_start(receiver); msgr->receiver = receiver; @@ -63,22 +84,85 @@ Handle Messenger::New(const Arguments& args) { return args.This(); } +Messenger::Subscription *Messenger::GetSubscriptionByAddress(std::string addr) { + Subscription *rval = NULL; + std::map::iterator it = _addressToSubscriptionMap.find(addr); + if(it != _addressToSubscriptionMap.end()) { + if(it->second < _subscriptions.size()) { + rval = _subscriptions.at(it->second); + } + } + return rval; +} + +Messenger::Subscription *Messenger::GetSubscriptionByIndex(unsigned long idx) { + Subscription *rval = NULL; + if(idx < _subscriptions.size()) { + rval = _subscriptions.at(idx); + } + return rval; +} + +Messenger::Subscription *Messenger::GetSubscriptionByHandle(pn_subscription_t *sub) { + Subscription *rval = NULL; + std::map::iterator it = _handleToSubscriptionMap.find(sub); + if(it != _handleToSubscriptionMap.end()) { + if(it->second < _subscriptions.size()) { + rval = _subscriptions.at(it->second); + } + } + return rval; +} + +unsigned long Messenger::AddSubscription(Messenger::Subscription *sub) { + unsigned long idx = ULONG_MAX; + if(sub) { + _subscriptions.push_back(sub); + idx = _subscriptions.size() - 1; + _addressToSubscriptionMap.insert( std::pair( sub->address, idx )); + } + return idx; +} + +bool Messenger::SetSubscriptionHandle(unsigned long idx, pn_subscription_t *sub) { + bool rval = false; + if(sub) { + std::map::iterator it = _handleToSubscriptionMap.find(sub); + if(it == _handleToSubscriptionMap.end()) { + _handleToSubscriptionMap.insert( std::pair( sub, idx )); + rval = true; + } + } + return rval; +} + Handle Messenger::Subscribe(const Arguments& args) { HandleScope scope; Messenger* msgr = ObjectWrap::Unwrap(args.This()); REQUIRE_ARGUMENT_STRING(0, addr); - OPTIONAL_ARGUMENT_FUNCTION(1, callback); - - msgr->address = *addr; - - SubscribeBaton* baton = new SubscribeBaton(msgr, callback, *addr); + REQUIRE_ARGUMENT_OBJECT(1, bag); + OPTIONAL_ARGUMENT_FUNCTION(2, callback); + pn_data_t *filter_value = NULL; + if(bag->Has(String::New("sourceFilter"))) { + Handle obj = bag->Get(String::New("sourceFilter")); + if(obj->IsArray()) { + filter_value = ProtonData::ParseJSData(obj); + } else { + // TODO -- unsupported + } + } + + NODE_CPROTON_MUTEX_LOCK(&msgr->mutex); + int subIdx = msgr->AddSubscription(new Subscription(*addr, callback)); + NODE_CPROTON_MUTEX_UNLOCK(&msgr->mutex); + Local var; + SubscribeBaton *baton = new SubscribeBaton(msgr, subIdx, filter_value, var); Work_BeginSubscribe(baton); - + return args.This(); - } void Messenger::Work_BeginSubscribe(Baton* baton) { @@ -90,46 +174,140 @@ void Messenger::Work_BeginSubscribe(Baton* baton) { } void Messenger::Work_Subscribe(uv_work_t* req) { - SubscribeBaton* baton = static_cast(req->data); - pn_messenger_subscribe(baton->msgr->receiver, baton->address.c_str()); - + NODE_CPROTON_MUTEX_LOCK(&baton->msgr->mutex) + Subscription *subscription = baton->msgr->GetSubscriptionByIndex(baton->subscriptionIndex); + if(subscription != NULL) { + pn_subscription_t * sub = pn_messenger_subscribe(baton->msgr->receiver, subscription->address.c_str()); + if(sub) { + baton->msgr->SetSubscriptionHandle(baton->subscriptionIndex, sub); + if(baton->filter_value) { + baton->msgr->SetSourceFilter(subscription->address, baton->filter_value); + } + } else { + // TODO -- error getting subscription? + } + } else { + // TODO -- something bad happened + } + NODE_CPROTON_MUTEX_UNLOCK(&baton->msgr->mutex) } void Messenger::Work_AfterSubscribe(uv_work_t* req) { + HandleScope scope; SubscribeBaton* baton = static_cast(req->data); - + + NODE_CPROTON_MUTEX_LOCK(&baton->msgr->mutex) + Subscription *subscription = baton->msgr->GetSubscriptionByIndex(baton->subscriptionIndex); + if(subscription) { + if(!subscription->callback.IsEmpty() && subscription->callback->IsFunction()) { + Local args[] = { String::New(subscription->address.c_str()) }; + subscription->callback->Call(Context::GetCurrent()->Global(), 1, args); + } else { + Local args[] = { String::New("subscribed"), String::New(subscription->address.c_str()) }; + EMIT_EVENT(baton->msgr->handle_, 2, args); + } + } baton->msgr->subscriptions++; + NODE_CPROTON_MUTEX_UNLOCK(&baton->msgr->mutex) +#if 0 +// TODO -- add error handling??? if (baton->error_code > 0) { /* Local err = Exception::Error( String::New(baton->error_message.c_str())); Local argv[2] = { Local::New(String::New("error")), err }; MakeCallback(baton->obj, "emit", 2, argv); */ - } else { - if (!baton->callback.IsEmpty() && baton->callback->IsFunction()) { - - Local args[] = { Local::New(Null()), String::New(baton->address.c_str()) }; - baton->callback->Call(Context::GetCurrent()->Global(), 2, args); - - } else { + } +#endif - Local args[] = { String::New("subscribed"), String::New(baton->address.c_str()) }; - EMIT_EVENT(baton->msgr->handle_, 2, args); + if (baton->msgr->receiveWait) { + Work_BeginReceive(baton->msgr->receiveWaitBaton); + } + + delete baton; +} + + + + +Handle Messenger::AddSourceFilter(const Arguments& args) { + HandleScope scope; + + Messenger* msgr = ObjectWrap::Unwrap(args.This()); + + REQUIRE_ARGUMENT_STRING(0, addr); + REQUIRE_ARGUMENT_OBJECT(1, filter); + OPTIONAL_ARGUMENT_FUNCTION(2, callback); + + pn_data_t *value = ProtonData::ParseJSData(filter); + + if(value) { + AddSourceFilterBaton *baton = new AddSourceFilterBaton(msgr, callback, *addr, value); + Work_BeginAddSourceFilter(baton); + } + + return args.This(); +} + +void Messenger::Work_BeginAddSourceFilter(Baton* baton) { + int status = uv_queue_work(uv_default_loop(), + &baton->request, Work_AddSourceFilter, (uv_after_work_cb)Work_AfterAddSourceFilter); + + assert(status == 0); +} + +void Messenger::SetSourceFilter(std::string & address, pn_data_t *filter_value) { + if(filter_value) { + pn_link_t *link = pn_messenger_get_link(this->receiver, address.c_str(), 0); + if(link) { + pn_terminus_t *sr = pn_link_source(link); + if(sr) { + pn_data_t *filter = pn_terminus_filter(sr); + if(filter) { + pn_data_copy(filter, filter_value); + } + } } } +} - if (baton->msgr->receiveWait) { +void Messenger::Work_AddSourceFilter(uv_work_t* req) { - Work_BeginReceive(baton->msgr->receiveWaitBaton); + AddSourceFilterBaton* baton = static_cast(req->data); + + // TODO - add check for subscribed to address + NODE_CPROTON_MUTEX_LOCK(&baton->msgr->mutex) + baton->msgr->SetSourceFilter(baton->address, baton->filter_value); + NODE_CPROTON_MUTEX_UNLOCK(&baton->msgr->mutex) + +} +void Messenger::Work_AfterAddSourceFilter(uv_work_t* req) { + + AddSourceFilterBaton* baton = static_cast(req->data); + + if(baton->filter_value) { + pn_data_free(baton->filter_value); + baton->filter_value = NULL; } - delete baton; + if (!baton->callback.IsEmpty() && baton->callback->IsFunction()) { + + Local args[] = { Local::New(Null()), String::New(baton->address.c_str()) }; + baton->callback->Call(Context::GetCurrent()->Global(), 2, args); + + } else { + Local args[] = { String::New("addedsourcefilter"), String::New(baton->address.c_str()) }; + EMIT_EVENT(baton->msgr->handle_, 2, args); + + } + + delete baton; } Handle Messenger::Send(const Arguments& args) { @@ -139,55 +317,36 @@ Handle Messenger::Send(const Arguments& args) { REQUIRE_ARGUMENT_OBJECT(0, obj); OPTIONAL_ARGUMENT_FUNCTION(1, callback); + pn_message_t *msg = JSToMessage(obj); - pn_message_t* msg = JSToMessage(obj); - - SendBaton* baton = new SendBaton(msgr, callback, msg); - - Work_BeginSend(baton); + InFlightMessage *ifmsg = new InFlightMessage(obj, msg, callback); + msgr->messageSender->AppendMessage(ifmsg); return Undefined(); - } + void Messenger::Work_BeginSend(Baton* baton) { + SendBaton * send_baton = static_cast(baton); + int status = uv_queue_work(uv_default_loop(), &baton->request, Work_Send, (uv_after_work_cb)Work_AfterSend); assert(status == 0); - } void Messenger::Work_Send(uv_work_t* req) { SendBaton* baton = static_cast(req->data); - pn_messenger_t* messenger = baton->msgr->messenger; - pn_message_t* message = baton->msg; - - assert(!pn_messenger_put(messenger, message)); - baton->tracker = pn_messenger_outgoing_tracker(messenger); - - assert(!pn_messenger_send(messenger)); - - pn_message_free(message); - + } void Messenger::Work_AfterSend(uv_work_t* req) { HandleScope scope; + SendBaton* baton = static_cast(req->data); - - if (baton->error_code > 0) { - Local err = Exception::Error(String::New(baton->error_message.c_str())); - Local argv[] = { err }; - baton->callback->Call(Context::GetCurrent()->Global(), 1, argv); - } else { - Local argv[] = {}; - baton->callback->Call(Context::GetCurrent()->Global(), 0, argv); - } - + delete baton; - } Handle Messenger::Receive(const Arguments& args) { @@ -253,19 +412,29 @@ void Messenger::Work_Receive(uv_work_t* req) { while (baton->msgr->receiving) { - pn_messenger_recv(receiver, 1024); + pn_messenger_recv(receiver, 150); while(pn_messenger_incoming(receiver)) { pn_message_t* message = pn_message(); pn_messenger_get(receiver, message); + pn_subscription_t *subscription = pn_messenger_incoming_subscription(receiver); + + MessageInfo *mi = new MessageInfo(); + mi->message = message; NODE_CPROTON_MUTEX_LOCK(&async->mutex) - async->data.push_back(message); + Messenger::Subscription *sub = baton->msgr->GetSubscriptionByHandle(subscription); + if(sub) { + mi->subscription_address = sub->address; + } + async->data.push_back(mi); NODE_CPROTON_MUTEX_UNLOCK(&async->mutex) uv_async_send(&async->watcher); - + + pn_tracker_t tracker = pn_messenger_incoming_tracker(receiver); + pn_messenger_accept(receiver, tracker, PN_CUMULATIVE); } } @@ -290,18 +459,18 @@ void Messenger::AsyncReceive(uv_async_t* handle, int status) { break; } - Local argv[2]; + Local argv[3]; Messages::const_iterator it = messages.begin(); Messages::const_iterator end = messages.end(); for (int i = 0; it < end; it++, i++) { argv[0] = String::NewSymbol("message"); - argv[1] = MessageToJS(*it); - TRY_CATCH_CALL(async->msgr->handle_, async->emitter, 2, argv) - pn_message_free(*it); - // delete *it; - + argv[1] = MessageToJS((*it)->message); + argv[2] = String::New((*it)->subscription_address.c_str()); + TRY_CATCH_CALL(async->msgr->handle_, async->emitter, 3, argv) + pn_message_free((*it)->message); + delete (*it); } } @@ -387,9 +556,17 @@ pn_message_t* Messenger::JSToMessage(Local obj) { pn_data_put_string(msg_body, pn_bytes(body.length(), str_body)); } + + if (obj->Has(String::New("annotations"))) { + + Handle annotations(obj->Get(String::New("annotations"))); + + pn_data_t *anndata = ProtonData::ParseJSData(annotations); + pn_data_t *msgann = pn_message_annotations(message); + pn_data_copy(msgann, anndata); + } return(message); - } Local Messenger::MessageToJS(pn_message_t* message) { @@ -397,12 +574,140 @@ Local Messenger::MessageToJS(pn_message_t* message) { Local result(Object::New()); size_t buffsize = 1024; - char buffer[buffsize]; + char buffer[1024]; + + // handle the body pn_data_t *body = pn_message_body(message); pn_data_format(body, buffer, &buffsize); - result->Set(String::NewSymbol("body"), Local(String::New(buffer))); + + // check for annotations + pn_data_t *annotations = pn_message_annotations(message); + if(annotations && pn_data_size(annotations) > 0) { + Handle anno = ProtonData::ParsePnData(annotations); + result->Set(String::NewSymbol("annotations"), anno); + } + + // check for properties + pn_data_t *properties = pn_message_properties(message); + if(properties && pn_data_size(properties) > 0) { + Handle prop = ProtonData::ParsePnData(properties); + result->Set(String::NewSymbol("properties"), prop); + } + + return result; +} + +int Messenger::MessengerGetOutgoingWindow(void) +{ + int result; + NODE_CPROTON_MUTEX_LOCK(&mutex) + result = pn_messenger_get_outgoing_window(messenger); + NODE_CPROTON_MUTEX_UNLOCK(&mutex) + return result; +} + +int Messenger::MessengerGetOutgoing(void){ + int result; + NODE_CPROTON_MUTEX_LOCK(&mutex) + result = pn_messenger_outgoing(messenger); + NODE_CPROTON_MUTEX_UNLOCK(&mutex) + return result; +} + +bool Messenger::MessengerGetBuffered(pn_tracker_t tracker){ + bool result; + NODE_CPROTON_MUTEX_LOCK(&mutex) + result = pn_messenger_buffered(messenger, tracker); + NODE_CPROTON_MUTEX_UNLOCK(&mutex) + return result; +} + +pn_status_t Messenger::MessengerGetStatus(pn_tracker_t tracker){ + pn_status_t result; + NODE_CPROTON_MUTEX_LOCK(&mutex) + result = pn_messenger_status(messenger, tracker); + NODE_CPROTON_MUTEX_UNLOCK(&mutex) + return result; +} + +int Messenger::MessengerSettleOutgoing(pn_tracker_t tracker){ + int result; + NODE_CPROTON_MUTEX_LOCK(&mutex) + result = pn_messenger_settle(messenger, tracker, 0); + NODE_CPROTON_MUTEX_UNLOCK(&mutex) + return result; +} + +int Messenger::MessengerSend(){ + int result; + NODE_CPROTON_MUTEX_LOCK(&mutex) + result = pn_messenger_send(messenger, 0); + NODE_CPROTON_MUTEX_UNLOCK(&mutex) + return result; +} + +int Messenger::MessengerWork(){ + int result; + NODE_CPROTON_MUTEX_LOCK(&mutex) + result = pn_messenger_work(messenger, 0); + NODE_CPROTON_MUTEX_UNLOCK(&mutex) + return result; +} +pn_tracker_t Messenger::MessengerGetOutgoingTracker(void){ + pn_tracker_t result; + NODE_CPROTON_MUTEX_LOCK(&mutex) + result = pn_messenger_outgoing_tracker(messenger); + NODE_CPROTON_MUTEX_UNLOCK(&mutex) return result; +} + +int Messenger::MessengerPut(pn_message_t *msg){ + int result; + NODE_CPROTON_MUTEX_LOCK(&mutex) + result = pn_messenger_put(messenger, msg); + NODE_CPROTON_MUTEX_UNLOCK(&mutex) + return result; +} + + +std::string Messenger::MapErrorToString(int err) { + switch(err) { + case MSG_ERROR_NONE: + return std::string("none"); + case MSG_ERROR_INTERNAL: + return std::string("internal error"); + case MSG_ERROR_REJECTED: + return std::string("rejected"); + case MSG_ERROR_STATUS_UNKNOWN: + return std::string("status unknown"); + case MSG_ERROR_ABORTED: + return std::string("aborted"); + } + return std::string("unknown error"); +} + +int Messenger::MapPNStatusToError(pn_status_t status) +{ + switch(status) { + case PN_STATUS_UNKNOWN: + return MSG_ERROR_STATUS_UNKNOWN; + case PN_STATUS_PENDING: + return MSG_ERROR_NONE; + case PN_STATUS_ACCEPTED: + return MSG_ERROR_NONE; + case PN_STATUS_REJECTED: + return MSG_ERROR_REJECTED; + case PN_STATUS_RELEASED: + return MSG_ERROR_NONE; + case PN_STATUS_MODIFIED: + return MSG_ERROR_NONE; + case PN_STATUS_ABORTED: + return MSG_ERROR_ABORTED; + case PN_STATUS_SETTLED: + return MSG_ERROR_NONE; + } + return MSG_ERROR_INTERNAL; } diff --git a/src/messenger.h b/src/messenger.h index 176777a..2f0f4d0 100644 --- a/src/messenger.h +++ b/src/messenger.h @@ -3,6 +3,8 @@ #include #include +#include +#include #include @@ -11,11 +13,24 @@ #include "macros.h" #include "threading.h" +#include "protondata.h" +//#include "sending.h" using namespace v8; using namespace node; -typedef std::vector Messages; +#define MSG_ERROR_NONE 0 +#define MSG_ERROR_INTERNAL -1 +#define MSG_ERROR_REJECTED -2 +#define MSG_ERROR_STATUS_UNKNOWN -3 +#define MSG_ERROR_ABORTED -4 + +struct MessageInfo { + pn_message_t *message; + std::string subscription_address; +}; + +typedef std::vector Messages; class Messenger : public node::ObjectWrap { public: @@ -47,21 +62,50 @@ class Messenger : public node::ObjectWrap { }; struct SubscribeBaton : Baton { + int subscriptionIndex; + pn_data_t *filter_value; + + SubscribeBaton(Messenger* msgr_, int subIndex, pn_data_t *value, Handle cb_) : + Baton(msgr_, cb_), + subscriptionIndex(subIndex), + filter_value(value) {}; + + ~SubscribeBaton() { + if(filter_value) { + pn_data_free(filter_value); + } + filter_value = NULL; + } + }; + + struct AddSourceFilterBaton : Baton { std::string address; - - SubscribeBaton(Messenger* msgr_, Handle cb_, const char* address_) : - Baton(msgr_, cb_), address(address_) {} + pn_data_t *filter_value; + + AddSourceFilterBaton(Messenger* msgr_, Handle cb_, const char* address_, pn_data_t *filter_value_) : + Baton(msgr_, cb_), address(address_), filter_value(filter_value_) {} + + ~AddSourceFilterBaton() { + if(filter_value) { + pn_data_free(filter_value); + } + filter_value = NULL; + } }; struct SendBaton : Baton { + + SendBaton(Messenger* msgr_, Handle cb_) : + Baton(msgr_, cb_) {} - pn_message_t * msg; - pn_tracker_t tracker; + }; - SendBaton(Messenger* msgr_, Handle cb_, pn_message_t * msg_) : - Baton(msgr_, cb_), msg(msg_) {} + struct SendSettlingBaton : Baton { + + SendSettlingBaton(Messenger* msgr_, Handle cb_) : + Baton(msgr_, cb_) {} }; @@ -80,7 +124,7 @@ class Messenger : public node::ObjectWrap { uv_async_t watcher; Messenger* msgr; Messages data; - NODE_CPROTON_MUTEX_t; + NODE_CPROTON_MUTEX_t(mutex); bool completed; int retrieved; @@ -91,7 +135,7 @@ class Messenger : public node::ObjectWrap { Async(Messenger* m, uv_async_cb async_cb) : msgr(m), completed(false), retrieved(0) { watcher.data = this; - NODE_CPROTON_MUTEX_INIT + NODE_CPROTON_MUTEX_INIT(mutex); msgr->Ref(); uv_async_init(uv_default_loop(), &watcher, async_cb); } @@ -99,23 +143,128 @@ class Messenger : public node::ObjectWrap { ~Async() { msgr->Unref(); emitter.Dispose(); - NODE_CPROTON_MUTEX_DESTROY + NODE_CPROTON_MUTEX_DESTROY(mutex) } }; + + struct Subscription { + Subscription(std::string address_, Handle cb_) : + address(address_), + callback(Persistent::New(cb_)) {}; + + std::string address; + Persistent callback; + }; + + Subscription *GetSubscriptionByAddress(std::string addr); + Subscription *GetSubscriptionByIndex(unsigned long idx); + Subscription *GetSubscriptionByHandle(pn_subscription_t *sub); + unsigned long AddSubscription(Subscription *sub); + bool SetSubscriptionHandle(unsigned long idx, pn_subscription_t *sub); + + void SetSourceFilter(std::string & address, pn_data_t *value); + + // protected access methods + int MessengerGetOutgoingWindow(void); + int MessengerGetOutgoing(void); + bool MessengerGetBuffered(pn_tracker_t tracker); + pn_status_t MessengerGetStatus(pn_tracker_t tracker); + int MessengerSettleOutgoing(pn_tracker_t tracker); + int MessengerSend(); + int MessengerWork(); + pn_tracker_t MessengerGetOutgoingTracker(void); + int MessengerPut(pn_message_t *msg); + + + + // error stuff + static int MapPNStatusToError(pn_status_t status); + static std::string MapErrorToString(int error); + + struct InFlightMessage { + InFlightMessage(Handle msg_, pn_message_t *pnmsg_, Handle cb_) : + pnmsg(pnmsg_), + retried(false), + error(0) { + callback = Persistent::New(cb_); + message = Persistent::New(msg_); + }; + + ~InFlightMessage() { + if(pnmsg) { + pn_message_free(pnmsg); + } + message.Dispose(); + callback.Dispose(); + } + + Persistent message; + Persistent callback; + pn_message_t *pnmsg; + pn_tracker_t tracker; + bool retried; + int error; +}; + +struct QueuedWorker { + QueuedWorker(Messenger *m); + ~QueuedWorker(); + + void AppendMessage(InFlightMessage *msg); + void AppendMessages(std::list *srcList, + std::list::iterator begin, + std::list::iterator end, + NODE_CPROTON_MUTEX_t *srcMutex); + +protected: + virtual void HandleQueue(void) = 0; + + Messenger *msgr; + std::list messageList; + NODE_CPROTON_MUTEX_t(mutex); + bool active; + uv_timer_t activityTimer; +}; + +struct MessageSender : QueuedWorker { + MessageSender(Messenger *m) : + QueuedWorker(m) {}; + virtual ~MessageSender() {}; + +protected: + virtual void HandleQueue(void); + static void ProcessSending(uv_timer_t* handle, int status); +}; + +struct MessageSettler : QueuedWorker { + MessageSettler(Messenger *m) : + QueuedWorker(m) {}; + + virtual ~MessageSettler() {}; + +protected: + virtual void HandleQueue(void); + static void ProcessSettling(uv_timer_t* handle, int status); +}; + + + + + + private: Messenger(); - ~Messenger() { - pn_messenger_stop(messenger); - pn_messenger_stop(receiver); - } + ~Messenger(); WORK_DEFINITION(Send) WORK_DEFINITION(Subscribe) + WORK_DEFINITION(AddSourceFilter) WORK_DEFINITION(Stop) WORK_DEFINITION(Put) WORK_DEFINITION(Receive) + WORK_DEFINITION(SendSettling) static void AsyncReceive(uv_async_t* handle, int status); static void CloseEmitter(uv_handle_t* handle); @@ -128,7 +277,16 @@ class Messenger : public node::ObjectWrap { pn_messenger_t * receiver; bool receiving; bool receiveWait; + NODE_CPROTON_MUTEX_t mutex; ReceiveBaton * receiveWaitBaton; + + MessageSender *messageSender; + MessageSettler *messageSettler; + + std::vector _subscriptions; + std::map _addressToSubscriptionMap; + std::map _handleToSubscriptionMap; + int subscriptions; }; diff --git a/src/protondata.cc b/src/protondata.cc new file mode 100644 index 0000000..241c524 --- /dev/null +++ b/src/protondata.cc @@ -0,0 +1,569 @@ +#include +#include +#include + +#include "protondata.h" + +pn_type_t ProtonData::JSTypeToPNType(std::string type) +{ + pn_type_t rtype = PN_NULL; + + if(type.compare("null") == 0) { + rtype = PN_NULL; + } else if(type.compare("bool") == 0) { + rtype = PN_BOOL; + } else if(type.compare("ubyte") == 0) { + rtype = PN_UBYTE; + } else if(type.compare("byte") == 0) { + rtype = PN_BYTE; + } else if(type.compare("ushort") == 0) { + rtype = PN_USHORT; + } else if(type.compare("short") == 0) { + rtype = PN_SHORT; + } else if(type.compare("uint") == 0) { + rtype = PN_UINT; + } else if(type.compare("int") == 0) { + rtype = PN_INT; + } else if(type.compare("char") == 0) { + rtype = PN_CHAR; + } else if(type.compare("ulong") == 0) { + rtype = PN_ULONG; + } else if(type.compare("long") == 0) { + rtype = PN_LONG; + } else if(type.compare("timestamp") == 0) { + rtype = PN_TIMESTAMP; + } else if(type.compare("float") == 0) { + rtype = PN_FLOAT; + } else if(type.compare("double") == 0) { + rtype = PN_DOUBLE; + } else if(type.compare("decimal32") == 0) { + rtype = PN_DECIMAL32; + } else if(type.compare("decimal64") == 0) { + rtype = PN_DECIMAL64; + } else if(type.compare("decimal128") == 0) { + rtype = PN_DECIMAL128; + } else if(type.compare("uuid") == 0) { + rtype = PN_UUID; + } else if(type.compare("binary") == 0) { + rtype = PN_BINARY; + } else if(type.compare("string") == 0) { + rtype = PN_STRING; + } else if(type.compare("symbol") == 0) { + rtype = PN_SYMBOL; + } else if(type.compare("described") == 0) { + rtype = PN_DESCRIBED; + } else if(type.compare("array") == 0) { + rtype = PN_ARRAY; + } else if(type.compare("list") == 0) { + rtype = PN_LIST; + } else if(type.compare("map") == 0) { + rtype = PN_MAP; + } + + return rtype; +} + +std::string ProtonData::PNTypeToJSType(pn_type_t type) +{ + std::string rtype; + + switch(type) + { + case PN_NULL: + rtype = std::string("null"); + break; + case PN_BOOL: + rtype = std::string("bool"); + break; + case PN_UBYTE: + rtype = std::string("ubyte"); + break; + case PN_BYTE: + rtype = std::string("byte"); + break; + case PN_USHORT: + rtype = std::string("ushort"); + break; + case PN_SHORT: + rtype = std::string("short"); + break; + case PN_UINT: + rtype = std::string("uint"); + break; + case PN_INT: + rtype = std::string("int"); + break; + case PN_CHAR: + rtype = std::string("char"); + break; + case PN_ULONG: + rtype = std::string("ulong"); + break; + case PN_LONG: + rtype = std::string("long"); + break; + case PN_TIMESTAMP: + rtype = std::string("timestamp"); + break; + case PN_FLOAT: + rtype = std::string("float"); + break; + case PN_DOUBLE: + rtype = std::string("double"); + break; + case PN_DECIMAL32: + rtype = std::string("decimal32"); + break; + case PN_DECIMAL64: + rtype = std::string("decimal64"); + break; + case PN_DECIMAL128: + rtype = std::string("decimal128"); + break; + case PN_UUID: + rtype = std::string("uuid"); + break; + case PN_BINARY: + rtype = std::string("binary"); + break; + case PN_STRING: + rtype = std::string("string"); + break; + case PN_SYMBOL: + rtype = std::string("symbol"); + break; + case PN_DESCRIBED: + rtype = std::string("described"); + break; + case PN_ARRAY: + rtype = std::string("array"); + break; + case PN_LIST: + rtype = std::string("list"); + break; + case PN_MAP: + rtype = std::string("map"); + break; + } + + return rtype; +} + +bool ProtonData::IsSimpleValue(pn_type_t type) +{ + bool rval = true; + switch(type) + { + case PN_DESCRIBED: + case PN_ARRAY: + case PN_LIST: + case PN_MAP: + rval = false; + break; + default: + ; + } + return rval; +} + +Handle ProtonData::GetSimpleValue(pn_data_t *data) +{ + HandleScope scope; + + switch(pn_data_type(data)) + { + case PN_NULL: + return Boolean::New(false); + case PN_BOOL: + return Boolean::New(pn_data_get_bool(data)); + case PN_UBYTE: + return Integer::NewFromUnsigned(pn_data_get_ubyte(data)); + case PN_BYTE: + return Integer::New(pn_data_get_byte(data)); + case PN_USHORT: + return Integer::NewFromUnsigned(pn_data_get_short(data)); + case PN_SHORT: + return Integer::New(pn_data_get_short(data)); + case PN_UINT: + return Integer::NewFromUnsigned(pn_data_get_uint(data)); + case PN_INT: + return Integer::New(pn_data_get_int(data)); + case PN_CHAR: + return Integer::New(pn_data_get_char(data)); + case PN_ULONG: + return Number::New(pn_data_get_ulong(data)); + case PN_LONG: + return Number::New(pn_data_get_long(data)); + case PN_TIMESTAMP: + return Number::New(pn_data_get_timestamp(data)); + case PN_FLOAT: + return Number::New(pn_data_get_float(data)); + case PN_DOUBLE: + return Number::New(pn_data_get_double(data)); + case PN_DECIMAL32: + return Number::New(pn_data_get_decimal32(data)); + case PN_DECIMAL64: + return Number::New(pn_data_get_decimal64(data)); + case PN_DECIMAL128: + { + pn_decimal128_t d128 = pn_data_get_decimal128(data); + return String::New(d128.bytes, 16); + } + case PN_UUID: + { + pn_uuid_t puu = pn_data_get_uuid(data); + return String::New(puu.bytes, 16); + } + case PN_BINARY: + case PN_STRING: + case PN_SYMBOL: + { + pn_bytes_t b; + if(pn_data_type(data) == PN_BINARY) { + b = pn_data_get_binary(data); + } else if(pn_data_type(data) == PN_STRING) { + b = pn_data_get_string(data); + } else { + b = pn_data_get_symbol(data); + } + return String::New(b.start, b.size); + } + default: + ; + } + return Null(); +} + +Handle ProtonData::GetDescribedValue(pn_data_t *data) +{ + HandleScope scope; + + pn_data_enter(data); + Handle description = ProtonData::ParsePnData(data); + pn_data_next(data); + Handle value = ProtonData::ParsePnData(data); + pn_data_exit(data); + + Handle t(Object::New()); + t->Set(String::New("description"), description); + t->Set(String::New("value"), value); + + return t; +} + +Handle ProtonData::GetArrayValue(pn_data_t *data) +{ + HandleScope scope; + + Handle t = Array::New(); + int idx = 0; + pn_data_enter(data); + while(pn_data_next(data)) { + t->Set(idx++, ProtonData::ParsePnData(data)); + } + + return t; +} + +Handle ProtonData::GetListValue(pn_data_t *data) +{ + return ProtonData::GetArrayValue(data); +} + +Handle ProtonData::GetMapValue(pn_data_t *data) +{ + HandleScope scope; + + Handle t(Array::New()); + + int nodes = pn_data_get_map(data); + if(nodes % 2) { + return Null(); + } else { + pn_data_enter(data); + for(int i = 0; i < nodes / 2; i++) { + t->Set(2*i, ParsePnData(data)); + t->Set(2*i+1, ParsePnData(data)); + } + } + + return t; +} + +Handle ProtonData::ParseValue(pn_data_t *data) +{ + if(ProtonData::IsSimpleValue(pn_data_type(data))) { + return GetSimpleValue(data); + } else if(pn_data_type(data) == PN_DESCRIBED) { + return GetDescribedValue(data); + } else if(pn_data_type(data) == PN_ARRAY) { + return GetArrayValue(data); + } else if(pn_data_type(data) == PN_LIST) { + return GetListValue(data); + } else if(pn_data_type(data) == PN_MAP) { + return GetMapValue(data); + } + return Null(); +} + +Local ProtonData::ParsePnData(pn_data_t *data) +{ + HandleScope scope; + Local t = Array::New(); + pn_data_next(data); + t->Set(0, String::New(PNTypeToJSType(pn_data_type(data)).c_str())); + t->Set(1, ParseValue(data)); + return scope.Close(t); +} + +pn_data_t *ProtonData::GetSimpleJSValue(pn_type_t type, Local jsval) +{ + pn_data_t *rval = pn_data(0); + + switch(type) + { + case PN_NULL: + pn_data_put_null(rval); + break; + case PN_BOOL: + pn_data_put_bool(rval, jsval->ToBoolean()->Value()); + break; + case PN_UBYTE: + pn_data_put_ubyte(rval, jsval->ToInteger()->Value()); + break; + case PN_BYTE: + pn_data_put_byte(rval, jsval->ToInteger()->Value()); + break; + case PN_USHORT: + pn_data_put_ushort(rval, jsval->ToInteger()->Value()); + break; + case PN_SHORT: + pn_data_put_short(rval, jsval->ToInteger()->Value()); + break; + case PN_UINT: + pn_data_put_uint(rval, jsval->ToUint32()->Value()); + break; + case PN_INT: + pn_data_put_int(rval, jsval->ToInteger()->Value()); + break; + case PN_CHAR: + pn_data_put_char(rval, jsval->ToInteger()->Value()); + break; + case PN_ULONG: + pn_data_put_ulong(rval, jsval->ToNumber()->Value()); + break; + case PN_LONG: + pn_data_put_long(rval, jsval->ToNumber()->Value()); + break; + case PN_TIMESTAMP: + pn_data_put_timestamp(rval, jsval->ToNumber()->Value()); + break; + case PN_FLOAT: + pn_data_put_float(rval, jsval->ToNumber()->Value()); + break; + case PN_DOUBLE: + pn_data_put_double(rval, jsval->ToNumber()->Value()); + break; + case PN_DECIMAL32: + pn_data_put_decimal32(rval, jsval->ToInt32()->Value()); + break; + case PN_DECIMAL64: + pn_data_put_decimal64(rval, jsval->ToNumber()->Value()); + break; + case PN_DECIMAL128: + { + pn_decimal128_t d128; + if(jsval->ToString()->Utf8Length() != 16) { + pn_data_free(rval); + rval = NULL; + } else { + jsval->ToString()->WriteUtf8(d128.bytes, 16); + pn_data_put_decimal128(rval, d128); + } + } + break; + case PN_UUID: + { + pn_uuid_t puu; + if(jsval->ToString()->Utf8Length() != 16) { + pn_data_free(rval); + rval = NULL; + } else { + jsval->ToString()->WriteUtf8(puu.bytes, 16); + pn_data_put_uuid(rval, puu); + } + } + break; + case PN_BINARY: + case PN_STRING: + case PN_SYMBOL: + { + pn_bytes_t b; + String::Utf8Value valstr(jsval->ToString()); + b.start = *valstr; + b.size = valstr.length(); + if(type == PN_BINARY) { + pn_data_put_binary(rval, b); + } else if(type == PN_STRING) { + pn_data_put_string(rval, b); + } else { + pn_data_put_symbol(rval, b); + } + } + default: + ; + } + + return rval; +} + +pn_data_t *ProtonData::GetDescribedJSValue(Handle array) +{ + pn_data_t *rval = NULL; + + if(array->Length() == 3) { + pn_data_t *description = ParseJSData(array->Get(1)); + if(description) { + pn_data_t *value = ParseJSData(array->Get(2)); + if(value) { + rval = pn_data(0); + pn_data_put_described(rval); + pn_data_enter(rval); + pn_data_append(rval, description); + pn_data_append(rval, value); + pn_data_exit(rval); + pn_data_rewind(rval); + } + pn_data_free(value); + } + pn_data_free(description); + } else { + // TODO - failure scenario + } + + return rval; +} + +pn_data_t *ProtonData::GetArrayOrListJSValue(pn_type_t type, Handle array) +{ + pn_data_t *rval = pn_data(0); + pn_type_t arraytype = PN_NULL; + int length = array->Length() - 1; + + if(type == PN_LIST) { + pn_data_put_list(rval); + } else { + if(length > 0) { + pn_data_t *data = ParseJSData(array->Get(1)); + if(data) { + arraytype = pn_data_type(data); + pn_data_free(data); + } else { + // TODO - handle error; + pn_data_free(rval); + return NULL; + } + } + pn_data_put_array(rval, false, arraytype); + } + + pn_data_enter(rval); + for(unsigned int i = 1; i < array->Length(); i++) { + pn_data_t *data = ParseJSData(array->Get(i)); + if(data) { + pn_data_append(rval, data); + } else { + // TODO - mention failure + } + } + pn_data_exit(rval); + pn_data_rewind(rval); + + return rval; +} + +pn_data_t *ProtonData::GetArrayJSValue(Handle array) +{ + return GetArrayOrListJSValue(PN_ARRAY, array); +} + +pn_data_t *ProtonData::GetListJSValue(Handle array) +{ + return GetArrayOrListJSValue(PN_LIST, array); +} + +pn_data_t *ProtonData::GetMapJSValue(Handle array) +{ + pn_data_t *rval = NULL; + int length = array->Length() - 1; + + if((length % 2) == 0) { + rval = pn_data(0); + pn_data_put_map(rval); + pn_data_enter(rval); + unsigned int nodecnt = length / 2; + for(unsigned int i = 0; i < nodecnt; i++) { + pn_data_t *key = ParseJSData(array->Get(i * 2 + 1)); + if(key) { + pn_data_t *value = ParseJSData(array->Get((i + 1) * 2)); + if(value) { + pn_data_append(rval, key); + pn_data_append(rval, value); + } else { + // TODO - log failure + pn_data_free(key); + } + } else { + // TODO - log failure + } + } + pn_data_exit(rval); + pn_data_rewind(rval); + } else { + // TODO - invalid number of entries + } + + return rval; +} + +pn_data_t *ProtonData::ParseJSData(Handle jsval) +{ + pn_data_t *rval = NULL; + + if(jsval->IsArray()) { + Handle array = Handle::Cast(jsval); + + String::Utf8Value typestr(array->Get(0)->ToString()); + pn_type_t type = JSTypeToPNType(std::string(*typestr)); + if(IsSimpleValue(type)) { + rval = GetSimpleJSValue(type, array->Get(1)); + } else if(type == PN_DESCRIBED) { + rval = GetDescribedJSValue(array); + } else { + Handle value = Handle::Cast(jsval); + if(type == PN_ARRAY) { + rval = GetArrayJSValue(value); + } else if(type == PN_LIST) { + rval = GetListJSValue(value); + } else if(type == PN_MAP) { + rval = GetMapJSValue(value); + } else { + // TODO -- failure? + return NULL; + } + } + } else { + Local value = Local::New(jsval); + if(jsval->IsString()) { + rval = GetSimpleJSValue(PN_STRING, value); + } else if(jsval->IsBoolean()) { + rval = GetSimpleJSValue(PN_BOOL, value); + } else if(jsval->IsNull()) { + rval = GetSimpleJSValue(PN_NULL, value); + } else { + // TODO -- BINARY, others? + } + } + + return rval; +} \ No newline at end of file diff --git a/src/protondata.h b/src/protondata.h new file mode 100644 index 0000000..d4af71c --- /dev/null +++ b/src/protondata.h @@ -0,0 +1,35 @@ +#ifndef _PROTONDATA_H_ +#define _PROTONDATA_H_ + +#include + +#include "proton/types.h" +#include "proton/codec.h" + +using namespace v8; +using namespace node; + +class ProtonData { + private: + static pn_type_t JSTypeToPNType(std::string type); + static std::string PNTypeToJSType(pn_type_t type); + static bool IsSimpleValue(pn_type_t type); + static Handle GetSimpleValue(pn_data_t *data); + static Handle GetDescribedValue(pn_data_t *data); + static Handle GetArrayValue(pn_data_t *data); + static Handle GetListValue(pn_data_t *data); + static Handle GetMapValue(pn_data_t *data); + static Handle ParseValue(pn_data_t *data); + static pn_data_t *GetSimpleJSValue(pn_type_t type, Local jsval); + static pn_data_t *GetDescribedJSValue(Handle array); + static pn_data_t *GetArrayOrListJSValue(pn_type_t type, Handle array); + static pn_data_t *GetArrayJSValue(Handle array); + static pn_data_t *GetListJSValue(Handle array); + static pn_data_t *GetMapJSValue(Handle array); + + public: + static Local ParsePnData(pn_data_t *data); + static pn_data_t *ParseJSData(Handle jsval); +}; + +#endif /* _PROTONDATA_H_ */ \ No newline at end of file diff --git a/src/sending.cc b/src/sending.cc new file mode 100644 index 0000000..17b420c --- /dev/null +++ b/src/sending.cc @@ -0,0 +1,253 @@ +#include +#include +#include + +#include "messenger.h" +#include "async.h" + +using namespace v8; +using namespace node; +using namespace std; + +Messenger::QueuedWorker::QueuedWorker(Messenger *m) : + msgr(m), + active(false) { + NODE_CPROTON_MUTEX_INIT(mutex); +}; + +Messenger::QueuedWorker::~QueuedWorker() { + std::list::iterator it = messageList.begin(); + NODE_CPROTON_MUTEX_LOCK(&mutex); + while(it != messageList.end()) { + InFlightMessage *msg = *it; + if(msg) { + delete(msg); + *it = NULL; + } + it = messageList.erase(it); + } + NODE_CPROTON_MUTEX_DESTROY(mutex); +}; + + +void Messenger::QueuedWorker::AppendMessage(InFlightMessage *msg) { + bool enableWorker; + NODE_CPROTON_MUTEX_LOCK(&mutex); + messageList.push_back(msg); + enableWorker = !active; + NODE_CPROTON_MUTEX_UNLOCK(&mutex); + + if(enableWorker) { + HandleQueue(); + } +} + +void Messenger::QueuedWorker::AppendMessages(std::list *srcList, + std::list::iterator begin, + std::list::iterator end, + NODE_CPROTON_MUTEX_t *srcMutex) { + bool enableWorker; + + NODE_CPROTON_MUTEX_LOCK(&mutex); + if(srcMutex) { + NODE_CPROTON_MUTEX_LOCK(srcMutex); + } + messageList.splice(messageList.end(), *srcList, begin, end); + enableWorker = !active; + if(srcMutex) { + NODE_CPROTON_MUTEX_UNLOCK(srcMutex); + } + NODE_CPROTON_MUTEX_UNLOCK(&mutex); + + if(enableWorker) { + HandleQueue(); + } +} + +void Messenger::MessageSender::HandleQueue(void) { + NODE_CPROTON_MUTEX_LOCK(&mutex); + if(!active) { + active = true; + uv_timer_init(uv_default_loop(), &activityTimer); + activityTimer.data = this; + uv_timer_start(&activityTimer, ProcessSending, 5, 0); + } + NODE_CPROTON_MUTEX_UNLOCK(&mutex); +} + +void Messenger::MessageSender::ProcessSending(uv_timer_t *handle, int status) { + MessageSender *sender = static_cast(handle->data); + pn_messenger_t* messenger = sender->msgr->messenger; + + // Determine how many messages can be sent in this iteration. + NODE_CPROTON_MUTEX_LOCK(&sender->mutex) + int outboundMessages = sender->messageList.size(); + int windowSize = sender->msgr->MessengerGetOutgoingWindow(); + int availableSlots = 0; + if(windowSize) { + int outgoing = sender->msgr->MessengerGetOutgoing(); + if(outgoing < windowSize) { + availableSlots = windowSize - outgoing; + } + } else { + // NOTE - if the window size is 0, then it will be impossible to track the + // status of a particular tracker. In that case (for settling purposes), + // PN_STATUS_UNKNOWN will not be an error. For sending, we will just try + // and send one message. + availableSlots = 1; + } + if(outboundMessages < availableSlots) { + availableSlots = outboundMessages; + } + NODE_CPROTON_MUTEX_UNLOCK(&sender->mutex) + + int err = 0; + std::list::iterator it = sender->messageList.begin(); + while(availableSlots) { + InFlightMessage *msg = *it; + if(msg) { + //err = pn_messenger_put( messenger, msg->pnmsg ); + sender->msgr->MessengerPut( msg->pnmsg ); + if(err) { + msg->error = err; + } else { + //msg->tracker = pn_messenger_outgoing_tracker( messenger ); + msg->tracker = sender->msgr->MessengerGetOutgoingTracker(); + } + } + availableSlots--; + it++; + } + + // messages sent? + if(it != sender->messageList.begin()) { + //err = pn_messenger_send(messenger, -1); + err = sender->msgr->MessengerSend(); + if(err) { + // TODO -- bubble this up + } + } else { + err = sender->msgr->MessengerWork(); + } + + if(it != sender->messageList.begin()) { + sender->msgr->messageSettler->AppendMessages(&sender->messageList, + sender->messageList.begin(), + it, + &sender->mutex); + } + + NODE_CPROTON_MUTEX_LOCK(&sender->mutex) + bool keepSending = ((sender->msgr->MessengerGetOutgoing() > 0) || (sender->messageList.size() > 0)); + sender->active = false; + NODE_CPROTON_MUTEX_UNLOCK(&sender->mutex) + if(keepSending) { + sender->HandleQueue(); + } +} + +void Messenger::MessageSettler::HandleQueue(void) { + NODE_CPROTON_MUTEX_LOCK(&mutex); + if(!active) { + active = true; + uv_timer_init(uv_default_loop(), &activityTimer); + activityTimer.data = this; + uv_timer_start(&activityTimer, ProcessSettling, 5, 0); + } + NODE_CPROTON_MUTEX_UNLOCK(&mutex); +} + +void Messenger::MessageSettler::ProcessSettling(uv_timer_t *handle, int status) { + HandleScope scope; + + MessageSettler *settler = static_cast(handle->data); + pn_messenger_t* messenger = settler->msgr->messenger; + + std::list::iterator it; + std::list abortedMessages; + + NODE_CPROTON_MUTEX_LOCK(&settler->mutex) + int needingSettling = settler->messageList.size(); + NODE_CPROTON_MUTEX_UNLOCK(&settler->mutex) + + it = settler->messageList.begin(); + while(needingSettling) { + bool processed = false; + + InFlightMessage *msg = *it; + if(msg) { + if(msg->error) { + // an error in sending + processed = true; + } else { + // once we hit a message that isn't buffered + bool buffered = settler->msgr->MessengerGetBuffered(msg->tracker); + pn_status_t status = settler->msgr->MessengerGetStatus(msg->tracker); + if(!buffered) { + // need to handle errors!!! + if((status == PN_STATUS_PENDING) || (status = PN_STATUS_SETTLED)) { + // consider message ok + } else { + if(status != PN_STATUS_SETTLED) { + msg->error = Messenger::MapPNStatusToError(status); + } + } + settler->msgr->MessengerSettleOutgoing(msg->tracker); + processed = true; + } else { + if(status != PN_STATUS_PENDING) { + settler->msgr->MessengerSettleOutgoing(msg->tracker); + if((status == PN_STATUS_ABORTED) && !msg->retried) { + msg->retried = true; + abortedMessages.push_back(msg); + *it = msg = NULL; + } else { + msg->error = Messenger::MapPNStatusToError(status); + } + processed = true; + } + // exit out of the loop since we reached a message still buffered + needingSettling = 0; + } + } + + if(msg && processed) { + if(!msg->callback.IsEmpty() && msg->callback->IsFunction()) { + if(msg->error) { + std::string errorString = Messenger::MapErrorToString(msg->error); + Local args[] = { String::New(errorString.c_str()), Local::New(msg->message) }; + msg->callback->Call(Context::GetCurrent()->Global(), 2, args); + } else { + Local args[] = { Local::New(Null()), Local::New(msg->message) }; + msg->callback->Call(Context::GetCurrent()->Global(), 2, args); + } + } + delete(msg); + } + } + + if(processed) { + if(needingSettling) { + needingSettling--; + } + it = settler->messageList.erase(it); + } + } + + if(abortedMessages.size() > 0) { + settler->msgr->messageSender->AppendMessages(&abortedMessages, + abortedMessages.begin(), + abortedMessages.end(), + NULL); + } + + NODE_CPROTON_MUTEX_LOCK(&settler->mutex) + //settler->messageList.erase(settler->messageList.begin(), it); + bool keepSending = (settler->messageList.size() > 0); + settler->active = false; + NODE_CPROTON_MUTEX_UNLOCK(&settler->mutex) + + if(keepSending) { + settler->HandleQueue(); + } +} \ No newline at end of file diff --git a/src/sending.h b/src/sending.h new file mode 100644 index 0000000..b83cacc --- /dev/null +++ b/src/sending.h @@ -0,0 +1,82 @@ +#ifndef SENDING_H +#define SENDING_H + +#include +#include +#include +#include + +#include + +#include "proton/message.h" + +class Messenger; + +struct InFlightMessage { + InFlightMessage(Handle msg_, pn_message_t *pnmsg_, Handle cb_) : + pnmsg(pnmsg_), + retried(false), + error(0) { + callback = Persistent::New(cb_); + message = Persistent::New(msg_); + }; + + ~InFlightMessage() { + if(pnmsg) { + pn_message_free(pnmsg); + } + message.Dispose(); + callback.Dispose(); + } + + Persistent message; + Persistent callback; + pn_message_t *pnmsg; + pn_tracker_t tracker; + bool retried; + int error; +}; + +struct QueuedWorker { + QueuedWorker(Messenger *m); + ~QueuedWorker(); + + void AppendMessage(InFlightMessage *msg); + void AppendMessages(std::list *srcList, + std::list::iterator begin, + std::list::iterator end, + NODE_CPROTON_MUTEX_t *srcMutex); + +protected: + virtual void HandleQueue(void) = 0; + + Messenger *msgr; + std::list messageList; + NODE_CPROTON_MUTEX_t(mutex); + bool active; + uv_timer_t activityTimer; +}; + +struct MessageSender : QueuedWorker { + MessageSender(Messenger *m) : + QueuedWorker(m) {}; + + ~MessageSender() {}; + +protected: + virtual void HandleQueue(void); + static void ProcessSending(uv_timer_t* handle, int status); +}; + +struct MessageSettler : QueuedWorker { + MessageSettler(Messenger *m) : + QueuedWorker(m) {}; + + ~MessageSettler() {}; + +protected: + virtual void HandleQueue(void); + static void ProcessSettling(uv_timer_t* handle, int status); +}; + +#endif /* SENDING_H */ \ No newline at end of file diff --git a/src/threading.h b/src/threading.h index 554ff84..798896a 100644 --- a/src/threading.h +++ b/src/threading.h @@ -12,27 +12,29 @@ #include - #define NODE_CPROTON_MUTEX_t HANDLE mutex; + #define NODE_CPROTON_MUTEX_t HANDLE - #define NODE_CPROTON_MUTEX_INIT CreateMutex(NULL, FALSE, NULL); + #define NODE_CPROTON_MUTEX_INIT(m) m = CreateMutex(NULL, FALSE, NULL); #define NODE_CPROTON_MUTEX_LOCK(m) WaitForSingleObject(m, INFINITE); #define NODE_CPROTON_MUTEX_UNLOCK(m) ReleaseMutex(m); - #define NODE_CPROTON_MUTEX_DESTROY CloseHandle(mutex); + #define NODE_CPROTON_MUTEX_DESTROY(m) CloseHandle(m); #else - #define NODE_CPROTON_MUTEX_t pthread_mutex_t mutex; +#include - #define NODE_CPROTON_MUTEX_INIT pthread_mutex_init(&mutex,NULL); + #define NODE_CPROTON_MUTEX_t pthread_mutex_t + + #define NODE_CPROTON_MUTEX_INIT(m) pthread_mutex_init(&m,NULL); #define NODE_CPROTON_MUTEX_LOCK(m) pthread_mutex_lock(m); #define NODE_CPROTON_MUTEX_UNLOCK(m) pthread_mutex_unlock(m); - #define NODE_CPROTON_MUTEX_DESTROY pthread_mutex_destroy(&mutex); + #define NODE_CPROTON_MUTEX_DESTROY(m) pthread_mutex_destroy(&m); #endif