diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index 8bb80b3c3..648117b22 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -653,12 +653,6 @@ void qdr_terminus_set_dnp_address_iterator(qdr_terminus_t *term, qd_iterator_t * ****************************************************************************** */ -typedef enum { - QD_DETACHED, // Protocol detach - QD_CLOSED, // Protocol close - QD_LOST // Connection or session closed -} qd_detach_type_t; - /** * qdr_link_set_context * @@ -810,15 +804,32 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target); /** - * qdr_link_detach + * qdr_link_detach_received * - * This function is invoked when a link detach arrives. + * This function is invoked when a link detach performative arrives from the remote peer. This may the first detach + * (peer-initiated link detach) or in response to a detach sent by the router (second detach). * * @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event. - * @param dt The type of detach that occurred. * @param error The link error from the detach frame or 0 if none. */ -void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error); +void qdr_link_detach_received(qdr_link_t *link, qdr_error_t *error); + + +/** + * qdr_link_closed + * + * This function is invoked by the adaptor when the link has fully closed. This will be the last call made by the + * adaptor for this link. This may be called as a result of a successful detach handshake or due to link loss. This will + * also be called during adaptor shutdown on any outstanding links. + * + * The core may free the qdr_link_t by this call. The adaptor MUST NOT reference the qdr_link_t on return from this + * call. + * + * @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event. + * @param forced True if the link was closed due to failure or shutdown. False if closed by clean detach handshake. + */ +void qdr_link_closed(qdr_link_t *link, bool forced); + /** * qdr_link_deliver diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index 2a78c013d..172e21c2d 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -139,24 +139,6 @@ static qdr_delivery_t *qdr_node_delivery_qdr_from_pn(pn_delivery_t *dlv) return ref ? (qdr_delivery_t*) ref->ref : 0; } -// clean up all qdr_delivery/pn_delivery bindings for the link -// -void qd_link_abandoned_deliveries_handler(qd_router_t *router, qd_link_t *link) -{ - qd_link_ref_list_t *list = qd_link_get_ref_list(link); - qd_link_ref_t *ref = DEQ_HEAD(*list); - - while (ref) { - qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref; - pn_delivery_t *pdlv = qdr_delivery_get_context(dlv); - assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv)); - - // this will remove and release the ref - qdr_node_disconnect_deliveries(router->router_core, link, dlv, pdlv); - ref = DEQ_HEAD(*list); - } -} - // read the delivery-state set by the remote endpoint // @@ -1223,10 +1205,9 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link) /** * Link Detached Handler */ -static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_detach_type_t dt) +static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link) { - if (!link) - return 0; + assert(link); pn_link_t *pn_link = qd_link_pn(link); if (!pn_link) @@ -1257,29 +1238,55 @@ static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_det } } - qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); - pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0; + // Notify the core that a detach has been received + qdr_link_t *rlink = (qdr_link_t *) qd_link_get_context(link); if (rlink) { - // - // If this is the second (response) detach or the link hasn't really detached but is being dropped due to parent - // connection/session loss then this is the last proton event that will be generated for this link. The qd_link - // will be freed on return from this call so remove the cross linkage between it and the qdr_link peer. - - if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) { - // note qdr_link context will be zeroed when the core sends the first detach, so if it is zero then this is - // the second detach! - qd_link_set_context(link, 0); - qdr_link_set_context(rlink, 0); - } - - qdr_error_t *error = qdr_error_from_pn(cond); - qdr_link_detach(rlink, dt, error); + pn_condition_t *cond = pn_link_remote_condition(pn_link); + qdr_error_t *error = qdr_error_from_pn(cond); + qdr_link_detach_received(rlink, error); } return 0; } + +/** + * Link closed handler + * + * This is the last callback for the given link - the link will be freed on return from this call! Forced is true if the + * link has not properly closed (detach handshake completed). +*/ +static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bool forced) +{ + assert(qd_link); + + // Clean up all qdr_delivery/pn_delivery bindings for the link. + + qd_link_ref_list_t *list = qd_link_get_ref_list(qd_link); + qd_link_ref_t *ref = DEQ_HEAD(*list); + + while (ref) { + qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref; + pn_delivery_t *pdlv = qdr_delivery_get_context(dlv); + assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv)); + + // This will decrement the qdr_delivery_t reference count - do not access the dlv pointer after this call! + qdr_node_disconnect_deliveries(router->router_core, qd_link, dlv, pdlv); + ref = DEQ_HEAD(*list); + } + + qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(qd_link); + if (qdr_link) { + // Notify core that this link no longer exists + qdr_link_set_context(qdr_link, 0); + qd_link_set_context(qd_link, 0); + qdr_link_closed(qdr_link, forced); + // This will cause the core to free qdr_link at some point so: + qdr_link = 0; + } +} + static void bind_connection_context(qdr_connection_t *qdrc, void* token) { qd_connection_t *conn = (qd_connection_t*) token; @@ -1776,8 +1783,8 @@ static const qd_node_type_t router_node = {"router", 0, AMQP_outgoing_link_handler, AMQP_conn_wake_handler, AMQP_link_detach_handler, + AMQP_link_closed_handler, AMQP_link_attach_handler, - qd_link_abandoned_deliveries_handler, AMQP_link_flow_handler, 0, // node_created_handler 0, // node_destroyed_handler @@ -1920,7 +1927,7 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error return; pn_link_t *pn_link = qd_link_pn(qlink); - if (!pn_link) + if (!pn_link || !!(pn_link_state(pn_link) & PN_LOCAL_CLOSED)) // already detached return; if (error) { @@ -1945,17 +1952,6 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error } } - // - // This is the last event for this link that the core is going to send into Proton so remove the core => adaptor - // linkage. If this is the response attach then there will be no further proton link events to send to the core so - // remove the adaptor => core linkage. If this is the first (request) detach preserve the adaptor => core linkage so - // we can notify the core when the second (response) detach arrives - // - qdr_link_set_context(link, 0); - if (!first) { - qd_link_set_context(qlink, 0); - } - qd_link_close(qlink); } diff --git a/src/adaptors/amqp/container.c b/src/adaptors/amqp/container.c index e18824496..b35ddb705 100644 --- a/src/adaptors/amqp/container.c +++ b/src/adaptors/amqp/container.c @@ -64,6 +64,9 @@ struct qd_link_t { ALLOC_DEFINE_SAFE(qd_link_t); ALLOC_DEFINE(qd_link_ref_t); +static void qd_link_free(qd_link_t *); + + /** Encapsulates a proton session */ struct qd_session_t { DEQ_LINKS(qd_session_t); @@ -277,7 +280,8 @@ static void notify_closed(qd_container_t *container, qd_connection_t *conn, void // The given connection has dropped. There will be no further link events for this connection so manually clean up all -// links +// links. Note that we do not free the pn_link_t - proton will free all links when the parent connection is freed. +// static void close_links(qd_container_t *container, pn_connection_t *conn, bool print_log) { pn_link_t *pn_link = pn_link_head(conn, 0); @@ -289,7 +293,7 @@ static void close_links(qd_container_t *container, pn_connection_t *conn, bool p if (print_log) qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Aborting link '%s' due to parent connection end", pn_link_name(pn_link)); - container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST); + container->ntype->link_closed_handler(container->qd_router, qd_link, true); // true == forced qd_link_free(qd_link); } @@ -318,6 +322,7 @@ static void cleanup_link(qd_link_t *link) // cleanup any inbound message that has not been forwarded qd_message_t *msg = qd_alloc_deref_safe_ptr(&link->incoming_msg); if (msg) { + qd_nullify_safe_ptr(&link->incoming_msg); qd_message_free(msg); } } @@ -326,8 +331,7 @@ static void cleanup_link(qd_link_t *link) static int close_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn) { // - // Close all links, passing QD_LOST as the reason. These links are not - // being properly 'detached'. They are being orphaned. + // Close all links. These links are not being properly 'detached'. They are being orphaned. // if (qd_conn) qd_conn->closed = true; @@ -508,9 +512,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) { if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) { - // Remote has nuked our session. Check for any links that were - // left open and forcibly detach them, since no detaches will - // arrive on this session. + // Remote has closed the session. Check for any child links and forcibly close them since there will be + // no detach performatives arriving for these links. Note that we do not free the pn_link_t since proton + // will free all child pn_link_t when it frees the session. pn_link = pn_link_head(conn, 0); while (pn_link) { pn_link_t *next_link = pn_link_next(pn_link, 0); @@ -529,7 +533,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Aborting link '%s' due to parent session end", pn_link_name(pn_link)); - container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST); + container->ntype->link_closed_handler(container->qd_router, qd_link, true); qd_link_free(qd_link); } } @@ -590,10 +594,6 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, pn_link = pn_event_link(event); qd_link = (qd_link_t*) pn_link_get_context(pn_link); if (qd_link) { - qd_detach_type_t dt = pn_event_type(event) == PN_LINK_REMOTE_CLOSE ? QD_CLOSED : QD_DETACHED; - if (qd_link->pn_link == pn_link) { - pn_link_close(pn_link); - } if (qd_link->policy_counted) { qd_link->policy_counted = false; if (pn_link_is_sender(pn_link)) { @@ -609,16 +609,21 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, } } - container->ntype->link_detach_handler(container->qd_router, qd_link, dt); + // notify arrival of inbound detach + container->ntype->link_detach_handler(container->qd_router, qd_link); - if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) { - // link fully closed - add_link_to_free_list(&qd_conn->free_link_list, pn_link); + if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { + // Link now fully detached + container->ntype->link_closed_handler(container->qd_router, qd_link, false); qd_link_free(qd_link); + add_link_to_free_list(&qd_conn->free_link_list, pn_link); + } + } else { // no qd_link, manually detach or free + if ((pn_link_state(pn_link) & PN_LOCAL_CLOSED) == 0) { + pn_link_close(pn_link); + } else { + add_link_to_free_list(&qd_conn->free_link_list, pn_link); } - - } else { - add_link_to_free_list(&qd_conn->free_link_list, pn_link); } } break; @@ -626,8 +631,13 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, case PN_LINK_LOCAL_CLOSE: pn_link = pn_event_link(event); if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { - add_link_to_free_list(&qd_conn->free_link_list, pn_link); - qd_link_free((qd_link_t *) pn_link_get_context(pn_link)); + qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link); + if (qd_link) { + // Link now fully detached + container->ntype->link_closed_handler(container->qd_router, qd_link, false); + qd_link_free(qd_link); + } + add_link_to_free_list(&qd_conn->free_link_list, pn_link); // why??? } break; @@ -775,7 +785,7 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name, } -void qd_link_free(qd_link_t *link) +static void qd_link_free(qd_link_t *link) { if (!link) return; @@ -783,8 +793,6 @@ void qd_link_free(qd_link_t *link) DEQ_REMOVE(amqp_adaptor.container->links, link); sys_mutex_unlock(&amqp_adaptor.container->lock); - amqp_adaptor.container->ntype->link_abandoned_deliveries_handler(amqp_adaptor.container->qd_router, link); - cleanup_link(link); free_qd_link_t(link); } diff --git a/src/adaptors/amqp/container.h b/src/adaptors/amqp/container.h index c133c9671..026956403 100644 --- a/src/adaptors/amqp/container.h +++ b/src/adaptors/amqp/container.h @@ -68,7 +68,6 @@ qd_container_t *qd_container(qd_router_t *router, const qd_node_type_t *node_typ void qd_container_free(qd_container_t *container); qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char *name, qd_session_class_t); -void qd_link_free(qd_link_t *link); /** * List of reference in the qd_link used to track abandoned deliveries @@ -98,7 +97,6 @@ pn_terminus_t *qd_link_target(qd_link_t *link); pn_terminus_t *qd_link_remote_source(qd_link_t *link); pn_terminus_t *qd_link_remote_target(qd_link_t *link); void qd_link_close(qd_link_t *link); -void qd_link_free(qd_link_t *link); void qd_link_q2_restart_receive(const qd_alloc_safe_ptr_t context); void qd_link_q3_block(qd_link_t *link); void qd_link_q3_unblock(qd_link_t *link); diff --git a/src/adaptors/amqp/node_type.h b/src/adaptors/amqp/node_type.h index 1308ee404..b5bbcc448 100644 --- a/src/adaptors/amqp/node_type.h +++ b/src/adaptors/amqp/node_type.h @@ -26,10 +26,10 @@ typedef struct qd_router_t qd_router_t; typedef bool (*qd_container_delivery_handler_t) (qd_router_t *, qd_link_t *link); typedef void (*qd_container_disposition_handler_t) (qd_router_t *, qd_link_t *link, pn_delivery_t *pnd); typedef int (*qd_container_link_handler_t) (qd_router_t *, qd_link_t *link); -typedef int (*qd_container_link_detach_handler_t) (qd_router_t *, qd_link_t *link, qd_detach_type_t dt); +typedef int (*qd_container_link_detach_handler_t) (qd_router_t *, qd_link_t *link); +typedef void (*qd_container_link_closed_handler_t) (qd_router_t *, qd_link_t *link, bool forced); typedef void (*qd_container_node_handler_t) (qd_router_t *); typedef int (*qd_container_conn_handler_t) (qd_router_t *, qd_connection_t *conn, void *context); -typedef void (*qd_container_link_abandoned_deliveries_handler_t) (qd_router_t *, qd_link_t *link); /** * A set of Node handlers for deliveries, links and container events. @@ -57,15 +57,20 @@ struct qd_node_type_t { /** Invoked when an activated connection is available for writing. */ qd_container_conn_handler_t writable_handler; - /** Invoked when a link is detached. */ + /** Invoked when link detached is received. */ qd_container_link_detach_handler_t link_detach_handler; + + /** The last callback issued for the given qd_link_t. The adaptor must clean up all state related to the qd_link_t + * as it will be freed on return from this call. The forced flag is set to true if the link is being forced closed + * due to the parent connection/session closing or on shutdown. + */ + qd_container_link_closed_handler_t link_closed_handler; + ///@} /** Invoked when a link we created was opened by the peer */ qd_container_link_handler_t link_attach_handler; - qd_container_link_abandoned_deliveries_handler_t link_abandoned_deliveries_handler; - /** Invoked when a link receives a flow event */ qd_container_link_handler_t link_flow_handler; diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index 00d9a80b4..d3ecb885d 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -628,7 +628,7 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) } if (!!conn->inbound_link) { - qdr_link_detach(conn->inbound_link, QD_LOST, 0); + qdr_link_closed(conn->inbound_link, true); } if (!!conn->outbound_delivery) { @@ -638,7 +638,7 @@ static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn) } if (!!conn->outbound_link) { - qdr_link_detach(conn->outbound_link, QD_LOST, 0); + qdr_link_closed(conn->outbound_link, true); } if (conn->observer_handle) { @@ -2501,7 +2501,7 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl) // if (!!connector->out_link) { qdr_link_set_context(connector->out_link, 0); - qdr_link_detach(connector->out_link, QD_LOST, 0); + qdr_link_closed(connector->out_link, true); connector->out_link = 0; } diff --git a/src/router_core/connections.c b/src/router_core/connections.c index c7977dba6..c340198b1 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -36,9 +36,8 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_link_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_link_processing_complete_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_detach_sent(qdr_link_t *link); static void qdr_link_processing_complete(qdr_core_t *core, qdr_link_t *link); static void qdr_connection_group_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn); static void qdr_connection_set_tracing_CT(qdr_core_t *core, qdr_action_t *action, bool discard); @@ -500,7 +499,7 @@ int qdr_connection_process(qdr_connection_t *conn) } sys_mutex_lock(&conn->work_lock); - if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) { + if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0) { // link_work ref transferred from link_work to work_list DEQ_INSERT_HEAD(link->work_list, link_work); link_work->processing = false; @@ -518,11 +517,9 @@ int qdr_connection_process(qdr_connection_t *conn) event_count++; } - if (detach_sent) { - // let the core thread know so it can clean up - qdr_link_detach_sent(link); - } else + if (!detach_sent) { qdr_record_link_credit(core, link); + } ref = DEQ_NEXT(ref); } @@ -697,6 +694,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, strcpy(link->name, name); link->link_direction = dir; + link->state = QDR_LINK_STATE_UNINIT; // transition to first attach occurs on core thread link->capacity = conn->link_capacity; link->credit_pending = conn->link_capacity; link->admin_enabled = true; @@ -764,25 +762,23 @@ void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_termin } -void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error) +void qdr_link_detach_received(qdr_link_t *link, qdr_error_t *error) { - qdr_action_t *action = qdr_action(qdr_link_inbound_detach_CT, "link_detach"); + qdr_action_t *action = qdr_action(qdr_link_inbound_detach_CT, "link_detach_received"); set_safe_ptr_qdr_connection_t(link->conn, &action->args.connection.conn); set_safe_ptr_qdr_link_t(link, &action->args.connection.link); action->args.connection.error = error; - action->args.connection.dt = dt; qdr_action_enqueue(link->core, action); } -/* let the core thread know that a dispatch has been sent by the I/O thread - */ -static void qdr_link_detach_sent(qdr_link_t *link) +void qdr_link_closed(qdr_link_t *link, bool forced) { - qdr_action_t *action = qdr_action(qdr_link_detach_sent_CT, "link_detach_sent"); + qdr_action_t *action = qdr_action(qdr_link_closed_CT, "link_closed"); set_safe_ptr_qdr_link_t(link, &action->args.connection.link); + action->args.connection.forced_close = forced; qdr_action_enqueue(link->core, action); } @@ -1213,6 +1209,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->conn_id = conn->identity; link->link_type = link_type; link->link_direction = dir; + link->state = QDR_LINK_STATE_ATTACH_SENT; link->capacity = conn->link_capacity; link->credit_pending = conn->link_capacity; link->name = (char*) malloc(QD_DISCRIMINATOR_SIZE + 8); @@ -1223,7 +1220,6 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->oper_status = QDR_LINK_OPER_DOWN; link->insert_prefix = 0; link->strip_prefix = 0; - link->attach_count = 1; link->core_ticks = qdr_core_uptime_ticks(core); link->zero_credit_time = link->core_ticks; link->priority = priority; @@ -1267,8 +1263,11 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, } -void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close) +void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition) { + assert((link->state & QDR_LINK_STATE_DETACH_SENT) == 0); + link->state |= QDR_LINK_STATE_DETACH_SENT; + // // Ensure a pooled link is no longer available for streaming messages // @@ -1283,8 +1282,8 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t // tell the I/O thread to do the detach // - link->detach_count += 1; - qdr_link_work_t *work = qdr_link_work(link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH); + bool first_detach = (link->state & QDR_LINK_STATE_DETACH_RECVD) == 0; // haven't received a detach + qdr_link_work_t *work = qdr_link_work(first_detach ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH); if (error) work->error = error; @@ -2034,12 +2033,13 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act return; } - qd_direction_t dir = action->args.connection.dir; + qd_direction_t dir = action->args.connection.dir; // - // Start the attach count. + // Expect this is the initial attach (remote initiated link) // - link->attach_count = 1; + assert((link->state & QDR_LINK_STATE_ATTACH_SENT) == 0); + link->state |= QDR_LINK_STATE_ATTACH_RECVD; // // Put the link into the proper lists for tracking. @@ -2058,7 +2058,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act if (((link->link_type == QD_LINK_CONTROL || link->link_type == QD_LINK_ROUTER) && conn->role != QDR_ROLE_INTER_ROUTER)) { link->link_type = QD_LINK_ENDPOINT; // Demote the link type to endpoint if this is not an inter-router connection - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_FORBIDDEN, true); + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_FORBIDDEN); qdr_terminus_free(source); qdr_terminus_free(target); qd_log(LOG_ROUTER_CORE, QD_LOG_ERROR, @@ -2073,7 +2073,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act // if (conn->role == QDR_ROLE_INTER_ROUTER && link->link_type == QD_LINK_ENDPOINT && core->control_links_by_mask_bit[conn->mask_bit] == 0) { - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE, true); + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE); qdr_terminus_free(source); qdr_terminus_free(target); qd_log(LOG_ROUTER_CORE, QD_LOG_ERROR, @@ -2117,7 +2117,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act if (core->addr_lookup_handler) core->addr_lookup_handler(core->addr_lookup_context, conn, link, dir, source, target); else { - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION); qdr_terminus_free(source); qdr_terminus_free(target); qd_log(LOG_ROUTER_CORE, QD_LOG_ERROR, @@ -2156,7 +2156,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act if (core->addr_lookup_handler) core->addr_lookup_handler(core->addr_lookup_context, conn, link, dir, source, target); else { - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION); qdr_terminus_free(source); qdr_terminus_free(target); qd_log(LOG_ROUTER_CORE, QD_LOG_ERROR, @@ -2203,8 +2203,13 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac return; } + // expect: called due to an attach received as a response to our sent attach + // + assert(!!(link->state & QDR_LINK_STATE_ATTACH_SENT)); + link->state |= QDR_LINK_STATE_ATTACH_RECVD; + link->oper_status = QDR_LINK_OPER_UP; - link->attach_count++; + // // Mark the link as an edge link if it's inside an edge connection. @@ -2289,28 +2294,13 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac } -static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +// Perform all detach-related link processing. +// +// error: (optional) error information that arrived in the detach performative +// +static void qdr_link_process_detach(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error) { - qdr_connection_t *conn = safe_deref_qdr_connection_t(action->args.connection.conn); - qdr_link_t *link = safe_deref_qdr_link_t(action->args.connection.link); - qdr_error_t *error = action->args.connection.error; - qd_detach_type_t dt = action->args.connection.dt; - - if (discard || !conn || !link) { - qdr_error_free(error); - return; - } - - if (link->detach_received) - return; - - link->detach_received = true; - ++link->detach_count; - - if (link->core_endpoint) { - qdrc_endpoint_do_detach_CT(core, link->core_endpoint, error, dt); - return; - } + qdr_connection_t *conn = link->conn; // // ensure a pooled link is no longer available for use @@ -2329,8 +2319,6 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b qdr_route_auto_link_detached_CT(core, link, error); } - - qdr_address_t *addr = link->owning_addr; if (addr) addr->ref_count++; @@ -2404,26 +2392,10 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b link->owning_addr = 0; - if (link->detach_count == 1) { - // - // Handle the disposition of any deliveries that remain on the link - // - qdr_link_cleanup_deliveries_CT(core, conn, link, false); - - // - // If the detach occurred via protocol, send a detach back. - // - if (dt != QD_LOST) { - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE, dt == QD_CLOSED); - } else { - // no detach can be sent out because the connection was lost - qdr_link_cleanup_protected_CT(core, conn, link, "Link lost"); - } - } else if (link->detach_send_done) { // detach count indicates detach has been scheduled - // I/O thread is finished sending detach, ok to free link now - - qdr_link_cleanup_protected_CT(core, conn, link, "Link detached"); - } + // + // Handle the disposition of any deliveries that remain on the link + // + qdr_link_cleanup_deliveries_CT(core, link->conn, link, false); // // If there was an address associated with this link, check to see if any address-related @@ -2433,24 +2405,78 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b addr->ref_count--; qdr_check_addr_CT(core, addr); } +} + + +static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + qdr_connection_t *conn = safe_deref_qdr_connection_t(action->args.connection.conn); + qdr_link_t *link = safe_deref_qdr_link_t(action->args.connection.link); + qdr_error_t *error = action->args.connection.error; + + if (discard || !conn || !link) { + qdr_error_free(error); + return; + } + + if (link->state & QDR_LINK_STATE_DETACH_RECVD) + return; + + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_link_inbound_detach_CT()", + conn->identity, link->identity); + + link->state |= QDR_LINK_STATE_DETACH_RECVD; + + const bool first_detach = (link->state & QDR_LINK_STATE_DETACH_SENT) == 0; + + if (link->core_endpoint) { + qdrc_endpoint_do_detach_CT(core, link->core_endpoint, error, first_detach); + return; + } + + qdr_link_process_detach(core, link, error); + + if (first_detach) { + // Send response detach + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE); + } if (error) qdr_error_free(error); } -/* invoked on core thread to signal that the I/O thread has sent the detach - */ -static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_link_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { - qdr_link_t *link = safe_deref_qdr_link_t(action->args.connection.link); + qdr_link_t *link = safe_deref_qdr_link_t(action->args.connection.link); + bool forced_close = action->args.connection.forced_close; if (discard || !link) return; - link->detach_send_done = true; - if (link->conn && link->detach_received) - qdr_link_cleanup_protected_CT(core, link->conn, link, "Link detached"); + if (forced_close) { + // The link has been forced closed rather than cleanly detached. + if ((link->state & QDR_LINK_STATE_DETACH_RECVD) == 0) { + // detach-related cleanup was not done - do it now + if (link->core_endpoint) { + bool first_detach = (link->state & QDR_LINK_STATE_DETACH_SENT) == 0; + qdrc_endpoint_do_detach_CT(core, link->core_endpoint, 0, first_detach); + return; + } + + qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_link_closed_CT(forced=%s) handle %s detach", + link->conn->identity, link->identity, forced_close ? "YES" : "NO", + (link->state & QDR_LINK_STATE_DETACH_SENT) == 0 ? "first" : "second"); + + qdr_link_process_detach(core, link, 0); + } + } + + qdr_link_cleanup_protected_CT(core, link->conn, link, + action->args.connection.forced_close + ? "Link forced closed" : "Link closed"); } diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c index 8c4e83ebe..f70b54f6e 100644 --- a/src/router_core/core_link_endpoint.c +++ b/src/router_core/core_link_endpoint.c @@ -100,8 +100,8 @@ void qdrc_endpoint_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_t void qdrc_endpoint_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error) { - qdr_link_outbound_detach_CT(core, ep->link, error, QDR_CONDITION_NONE, true); - if (ep->link->detach_count == 2) { + qdr_link_outbound_detach_CT(core, ep->link, error, QDR_CONDITION_NONE); + if (QDR_LINK_STATE_IS_CLOSED(ep->link->state)) { qdrc_endpoint_do_cleanup_CT(core, ep); } } @@ -213,17 +213,13 @@ void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *ep, int credit, } -void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error, qd_detach_type_t dt) +void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_error_t *error, bool first_detach) { - if (dt == QD_LOST) { - qdrc_endpoint_do_cleanup_CT(core, ep); - qdr_error_free(error); - - } else if (ep->link->detach_count == 1) { + if (first_detach) { if (!!ep->desc->on_first_detach) ep->desc->on_first_detach(ep->link_context, error); else { - qdr_link_outbound_detach_CT(core, ep->link, 0, QDR_CONDITION_NONE, true); + qdr_link_outbound_detach_CT(core, ep->link, 0, QDR_CONDITION_NONE); qdr_error_free(error); } } else { diff --git a/src/router_core/core_link_endpoint.h b/src/router_core/core_link_endpoint.h index 59ec76da7..7e8b65c0a 100644 --- a/src/router_core/core_link_endpoint.h +++ b/src/router_core/core_link_endpoint.h @@ -193,7 +193,7 @@ qd_direction_t qdrc_endpoint_get_direction_CT(const qdrc_endpoint_t *endpoint qdr_connection_t *qdrc_endpoint_get_connection_CT(qdrc_endpoint_t *endpoint); /** - * Detach a link attached to the core-endpoint + * Respond to a link attach to the core-endpoint. Typically called by the on_first_attach callback. * * @param core Pointer to the core object * @param endpoint Pointer to an endpoint object @@ -261,7 +261,7 @@ void qdrc_endpoint_do_second_attach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoi void qdrc_endpoint_do_deliver_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_delivery_t *delivery); void qdrc_endpoint_do_update_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_delivery_t *delivery, bool settled); void qdrc_endpoint_do_flow_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, int credit, bool drain); -void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error, qd_detach_type_t dt); +void qdrc_endpoint_do_detach_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint, qdr_error_t *error, bool first_detach); void qdrc_endpoint_do_cleanup_CT(qdr_core_t *core, qdrc_endpoint_t *endpoint); #endif diff --git a/src/router_core/modules/address_lookup_client/address_lookup_client.c b/src/router_core/modules/address_lookup_client/address_lookup_client.c index ede897c08..5fc812a9c 100644 --- a/src/router_core/modules/address_lookup_client/address_lookup_client.c +++ b/src/router_core/modules/address_lookup_client/address_lookup_client.c @@ -234,13 +234,13 @@ static void qdr_link_react_to_first_attach_CT(qdr_core_t *core, source = target = 0; // ownership passed to qdrc_endpoint_do_bound_attach_CT } else if (unavailable) { - qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true); + qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0); } else if (!addr) { // // No route to this destination, reject the link // - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true); + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION); } else { // // Prior to binding, check to see if this is an inter-edge connection. If so, diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c index 4d7e228c0..8fb1118cb 100644 --- a/src/router_core/modules/edge_router/addr_proxy.c +++ b/src/router_core/modules/edge_router/addr_proxy.c @@ -117,7 +117,7 @@ static void del_inlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) if (link) { qd_nullify_safe_ptr(&addr->edge_inlink_sp); qdr_core_unbind_address_link_CT(ap->core, addr, link); - qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE, true); + qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE); } } @@ -148,7 +148,7 @@ static void del_outlink(qcm_edge_addr_proxy_t *ap, qdr_address_t *addr) if (link) { qd_nullify_safe_ptr(&addr->edge_outlink_sp); qdr_core_unbind_address_link_CT(ap->core, addr, link); - qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE, true); + qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE); } } @@ -184,7 +184,7 @@ static void remove_proxies_for_addr(qcm_edge_addr_proxy_t *ap, qdr_address_t *ad qdr_link_t *link = ref->link; if (link->conn && link->conn->role == QDR_ROLE_INTER_EDGE) { qdr_core_unbind_address_link_CT(ap->core, addr, link); - qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE, true); + qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE); } ref = next; } diff --git a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c index 34981d00c..ad6065569 100644 --- a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c +++ b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c @@ -91,7 +91,7 @@ static void idle_link_cleanup(qdr_core_t *core, qdr_connection_t *conn) qd_log(LOG_ROUTER_CORE, QD_LOG_DEBUG, "[C%" PRIu64 "][L%" PRIu64 "] Streaming link scrubber: closing idle link %s", link->conn->identity, link->identity, (link->name) ? link->name : ""); - qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE, true); + qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NONE); } } } diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index abb45885a..40d501d19 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -194,7 +194,7 @@ static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, q qdr_route_log_CT(core, "Auto Link Deactivated", al->name, al->identity, conn); if (al->link) { - qdr_link_outbound_detach_CT(core, al->link, 0, QDR_CONDITION_NONE, true); + qdr_link_outbound_detach_CT(core, al->link, 0, QDR_CONDITION_NONE); al->link->auto_link = 0; al->link = 0; } diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index df69d8a6d..2487acd6a 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -126,10 +126,10 @@ struct qdr_action_t { qdr_terminus_t *source; qdr_terminus_t *target; qdr_error_t *error; - qd_detach_type_t dt; int credit; bool drain; bool enable_protocol_trace; + bool forced_close; qdr_delivery_t *initial_delivery; } connection; @@ -428,6 +428,22 @@ typedef enum { QDR_LINK_OPER_IDLE } qdr_link_oper_status_t; +typedef enum { + QDR_LINK_STATE_UNINIT = 0x00, + QDR_LINK_STATE_ATTACH_RECVD = 0x01, + QDR_LINK_STATE_ATTACH_SENT = 0x02, + QDR_LINK_STATE_DETACH_RECVD = 0x04, + QDR_LINK_STATE_DETACH_SENT = 0x08, + QDR_LINK_STATE_MASK = 0x0F +} qdr_link_state_t; + +// Link Open: both sides attach (and no detaches yet) +#define QDR_LINK_STATE_IS_OPEN(LS) (!!(((LS) & QDR_LINK_STATE_MASK) == \ + (QDR_LINK_STATE_ATTACH_RECVD | QDR_LINK_STATE_ATTACH_SENT))) +// Link Closed: both sides detach +#define QDR_LINK_STATE_IS_CLOSED(LS) (!!(((LS) & (QDR_LINK_STATE_DETACH_RECVD | QDR_LINK_STATE_DETACH_SENT)) \ + == (QDR_LINK_STATE_DETACH_RECVD | QDR_LINK_STATE_DETACH_SENT))) + #define QDR_LINK_RATE_DEPTH 5 struct qdr_link_t { @@ -439,12 +455,11 @@ struct qdr_link_t { qdr_connection_t *conn; ///< [ref] Connection that owns this link qd_link_type_t link_type; qd_direction_t link_direction; + qdr_link_state_t state; qdr_link_work_list_t work_list; char *name; char *disambiguated_name; char *terminus_addr; - int attach_count; ///< 1 or 2 depending on the state of the lifecycle - int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle uint32_t open_moved_streams; ///< Number of still-open streaming deliveries that were moved from this link qdr_address_t *owning_addr; ///< [ref] Address record that owns this link qdrc_endpoint_t *core_endpoint; ///< [ref] Set if this link terminates on an in-core endpoint @@ -467,8 +482,6 @@ struct qdr_link_t { bool strip_annotations_out; bool drain_mode; bool stalled_outbound; ///< Indicates that this link is stalled on outbound buffer backpressure - bool detach_received; ///< True on core receipt of inbound attach - bool detach_send_done; ///< True once the detach has been sent by the I/O thread bool edge; ///< True if this link is in an edge-connection bool processing; ///< True if an IO thread is currently handling this link bool ready_to_free; ///< True if the core thread wanted to clean up the link but it was processing @@ -992,7 +1005,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, qd_session_class_t ssn_class, uint8_t priority); -void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close); +void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition); void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target); bool qdr_link_is_idle_CT(const qdr_link_t *link); qdr_terminus_t *qdr_terminus_router_control(void); ///< new terminus for router control links diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index b44798686..5e67b4c1e 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -152,10 +152,6 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) if (link->link_direction == QD_OUTGOING) { - // If a detach has been received on the link, there is no need to process deliveries on the link. - if (link->detach_received) - return 0; - while (credit > 0) { sys_mutex_lock(&conn->work_lock); dlv = DEQ_HEAD(link->undelivered); @@ -432,10 +428,10 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar qdrc_endpoint_do_flow_CT(core, link->core_endpoint, credit, drain); } - if (link->attach_count == 1) + if (!QDR_LINK_STATE_IS_OPEN(link->state)) // - // The link is half-open. Store the pending credit to be dealt with once the link is - // progressed to the next step. + // The link is not fully open. Store the pending credit to be dealt with once the link has + // reached the open state. // link->credit_stored += credit; diff --git a/tests/test-receiver.c b/tests/test-receiver.c index 46377ffbd..77d7120a2 100644 --- a/tests/test-receiver.c +++ b/tests/test-receiver.c @@ -196,7 +196,6 @@ static bool event_handler(pn_event_t *event) case PN_PROACTOR_INACTIVE: case PN_PROACTOR_INTERRUPT: { - assert(stop); // expect: due to stopping debug("proactor inactive!\n"); return true; } break; diff --git a/tests/test-sender.c b/tests/test-sender.c index 2646d00e0..baaf300a1 100644 --- a/tests/test-sender.c +++ b/tests/test-sender.c @@ -88,9 +88,6 @@ char *host_address = _addr; char *container_name = "TestSender"; char proactor_address[1024]; -pn_connection_t *pn_conn; -pn_session_t *pn_ssn; -pn_link_t *pn_link; pn_proactor_t *proactor; pn_message_t *out_message; @@ -253,6 +250,7 @@ static bool event_handler(pn_event_t *event) case PN_CONNECTION_INIT: { // Create and open all the endpoints needed to send a message // + pn_connection_t *pn_conn = pn_event_connection(event); pn_connection_open(pn_conn); pn_session_t *pn_ssn = pn_session(pn_conn); pn_session_open(pn_ssn); @@ -267,30 +265,31 @@ static bool event_handler(pn_event_t *event) } break; - case PN_CONNECTION_WAKE: { - if (stop) { - pn_proactor_cancel_timeout(proactor); - if (drop_connection) { // hard stop - if (verbose) { - fprintf(stdout, - "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 - " Released:%"PRIu64" Modified:%"PRIu64"\n", - count, accepted, rejected, released, modified); - fflush(stdout); - } - exit(0); - } - if (pn_conn) { - debug("Stop detected - closing connection...\n"); - if (pn_link) pn_link_close(pn_link); - if (pn_ssn) pn_session_close(pn_ssn); - pn_connection_close(pn_conn); - pn_link = 0; - pn_ssn = 0; - pn_conn = 0; - } + case PN_LINK_REMOTE_CLOSE: { + pn_link_t *pn_link = pn_event_link(event); + if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { + pn_session_close(pn_link_session(pn_link)); + pn_link_free(pn_link); } - } break; + break; + } + + case PN_SESSION_REMOTE_CLOSE: { + pn_session_t *pn_session = pn_event_session(event); + if (pn_session_state(pn_session) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { + pn_connection_close(pn_session_connection(pn_session)); + pn_session_free(pn_session); + } + break; + } + + case PN_CONNECTION_REMOTE_CLOSE: { + pn_connection_t *pn_conn = pn_event_connection(event); + if (pn_connection_state(pn_conn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { + pn_conn = 0; + } + break; + } case PN_LINK_FLOW: { // the remote has given us some credit, now we can send messages @@ -315,7 +314,19 @@ static bool event_handler(pn_event_t *event) // no need to wait for acks debug("stopping (presettled)...\n"); stop = true; - pn_connection_wake(pn_conn); + pn_proactor_cancel_timeout(proactor); + if (drop_connection) { // hard stop + if (verbose) { + fprintf(stdout, + "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 + " Released:%"PRIu64" Modified:%"PRIu64"\n", + count, accepted, rejected, released, modified); + fflush(stdout); + } + exit(0); + } else { // graceful stop + pn_link_close(sender); + } } } } @@ -357,29 +368,40 @@ static bool event_handler(pn_event_t *event) if (limit && acked == limit) { // initiate clean shutdown of the endpoints - debug("stopping...\n"); + debug("Done sending\n"); stop = true; - pn_connection_wake(pn_conn); + pn_proactor_cancel_timeout(proactor); + if (drop_connection) { // hard stop + if (verbose) { + fprintf(stdout, + "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 + " Released:%"PRIu64" Modified:%"PRIu64"\n", + count, accepted, rejected, released, modified); + fflush(stdout); + } + exit(0); + } else { // graceful stop + pn_link_close(pn_event_link(event)); + } } } } break; case PN_PROACTOR_TIMEOUT: { - if (verbose) { - fprintf(stdout, - "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 - " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n", - count, accepted, rejected, released, modified, limit); - fflush(stdout); - if (!stop) { - pn_proactor_set_timeout(proactor, 10 * 1000); + if (!stop) { + if (verbose) { + fprintf(stdout, + "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64 + " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n", + count, accepted, rejected, released, modified, limit); + fflush(stdout); } + pn_proactor_set_timeout(proactor, 10 * 1000); } } break; case PN_PROACTOR_INACTIVE: case PN_PROACTOR_INTERRUPT: { - assert(stop); // expect: due to stopping debug("proactor inactive!\n"); return true; } break; @@ -469,7 +491,7 @@ int main(int argc, char** argv) port = "5672"; } - pn_conn = pn_connection(); + pn_connection_t *pn_conn = pn_connection(); // the container name should be unique for each client pn_connection_set_container(pn_conn, container_name); pn_connection_set_hostname(pn_conn, host);