diff --git a/core/modules/port_inc.cc b/core/modules/port_inc.cc index acdb6b1b6..57fcbc623 100644 --- a/core/modules/port_inc.cc +++ b/core/modules/port_inc.cc @@ -44,6 +44,9 @@ CommandResponse PortInc::Init(const bess::pb::PortIncArg &arg) { int ret; CommandResponse err; placement_constraint placement; + value_t value; + mask_t mask = {0}; + std::string name = "ifName"; burst_ = bess::PacketBatch::kMaxBurst; @@ -86,6 +89,22 @@ CommandResponse PortInc::Init(const bess::pb::PortIncArg &arg) { return CommandFailure(-ret); } + ret = AddMetadataAttr("ifIndex", sizeof(uint64_t), + bess::metadata::Attribute::AccessMode::kWrite); + + uint64_t port_ifindex = port_->get_ifIndex(); + + std::memcpy(&value, &port_ifindex, sizeof(uint64_t)); + + /* We may add more "metadata at source" here later */ + attrs_.push_back({.name = name, + .value = value, + .mask = mask, + .offset = -1, + .size = sizeof(uint64_t), + .do_mask = false, + .shift = 0}); + return CommandSuccess(); } @@ -136,18 +155,29 @@ struct task_result PortInc::RunTask(Context *ctx, bess::PacketBatch *batch, return {.block = true, .packets = 0, .bits = 0}; } - // NOTE: we cannot skip this step since it might be used by scheduler. - if (prefetch_) { - for (uint32_t i = 0; i < cnt; i++) { - received_bytes += batch->pkts()[i]->total_len(); - rte_prefetch0(batch->pkts()[i]->head_data()); + for (uint32_t i = 0; i < cnt; i++) { + received_bytes += batch->pkts()[i]->total_len(); + if (prefetch_) { + rte_prefetch0(batch->pkts()[i]->head_data()); } - } else { - for (uint32_t i = 0; i < cnt; i++) { - received_bytes += batch->pkts()[i]->total_len(); + bess::Packet *snb = batch->pkts()[i]; + for (size_t k = 0; k < attrs_.size(); k++) { + const struct Attr *attr = &attrs_[k]; + mt_offset_t mt_offset = attr_offset(k); + + if (!bess::metadata::IsValidOffset(mt_offset)) { + continue; + } + + void *mt_ptr = _ptr_attr_with_offset(mt_offset, snb); + const void *val_ptr = &attr->value; + + bess::utils::CopySmall(mt_ptr, val_ptr, attr->size); } } + // NOTE: we cannot skip this step since it might be used by scheduler. + if (!(p->GetFlags() & DRIVER_FLAG_SELF_INC_STATS)) { p->queue_stats[PACKET_DIR_INC][qid].packets += cnt; p->queue_stats[PACKET_DIR_INC][qid].bytes += received_bytes; diff --git a/core/modules/port_inc.h b/core/modules/port_inc.h index d28737420..384196e6f 100644 --- a/core/modules/port_inc.h +++ b/core/modules/port_inc.h @@ -35,6 +35,27 @@ #include "../pb/module_msg.pb.h" #include "../port.h" + +using bess::metadata::kMetadataAttrMaxSize; +using bess::metadata::mt_offset_t; + +typedef struct { + uint8_t bytes[kMetadataAttrMaxSize]; +} value_t; +typedef struct { + uint8_t bytes[kMetadataAttrMaxSize]; +} mask_t; + +struct Attr { + std::string name; + value_t value; + mask_t mask; + int offset; + size_t size; + bool do_mask; + int shift; // in bytes for now +}; + class PortInc final : public Module { public: static const gate_idx_t kNumIGates = 0; @@ -63,6 +84,7 @@ class PortInc final : public Module { Port *port_; int prefetch_; int burst_; + std::vector attrs_; }; #endif // BESS_MODULES_PORTINC_H_ diff --git a/core/modules/port_out.cc b/core/modules/port_out.cc index d798a71a3..7939a2080 100644 --- a/core/modules/port_out.cc +++ b/core/modules/port_out.cc @@ -65,16 +65,27 @@ CommandResponse PortOut::Init(const bess::pb::PortOutArg &arg) { mcs_lock_init(&queue_locks_[i]); } + if (arg.rpfcheck()) { + rpfcheck_ = 1; + } else { + rpfcheck_ = 0; + } + if (ret < 0) { return CommandFailure(-ret); } + std::string attr_name = "ifIndex"; + using AccessMode = bess::metadata::Attribute::AccessMode; + attr_id_ = AddMetadataAttr(attr_name, sizeof(uint64_t), AccessMode::kRead); + return CommandSuccess(); } CommandResponse PortOut::GetInitialArg(const bess::pb::EmptyArg &) { bess::pb::PortOutArg arg; arg.set_port(port_->name()); + arg.set_rpfcheck(rpfcheck_); return CommandSuccess(arg); } @@ -90,42 +101,76 @@ std::string PortOut::GetDesc() const { port_->port_builder()->class_name().c_str()); } -static inline int SendBatch(bess::PacketBatch *batch, Port *p, queue_t qid) { +int PortOut::SendBatch(bess::PacketBatch *batch, queue_t qid) { uint64_t sent_bytes = 0; int sent_pkts = 0; - - if (p->conf().admin_up) { - sent_pkts = p->SendPackets(qid, batch->pkts(), batch->cnt()); + int dropped = 0; + int cnt = batch->cnt(); + + if (port_->conf().admin_up) { + if ((!rpfcheck_) || (attr_id_ == -1)) { + /* no RPF check - send everything at once */ + sent_pkts = port_->SendPackets(qid, batch->pkts(), batch->cnt()); + } else { + bool need_flush = true; + for (int i = 0; i < cnt; i++) { + uint64_t ifIndex = 0; + ifIndex = get_attr(this, attr_id_, batch->pkts()[i]); + if ((ifIndex > 0) && (ifIndex == port_->get_ifIndex())) { + /* need a drop - send all packets till now, + * free this packet */ + if (i - sent_pkts > 0) { + sent_pkts += + port_->SendPackets(qid, &batch->pkts()[sent_pkts], i - sent_pkts); + } + if (sent_pkts == i) { + /* send sucessful, drop packet failing rpf check*/ + bess::Packet::Free(batch->pkts()[i]); + /* Add dropped packet to "sent" count. We will subtract it later. + */ + sent_pkts++; + dropped++; + /* move to next packet */ + } else { + /* remove from the total the length of packets we failed to send */ + for (int k = sent_pkts; k < i; k++) { + sent_bytes -= batch->pkts()[k]->total_len(); + } + need_flush = false; + break; + } + } else { + sent_bytes += batch->pkts()[i]->total_len(); + } + } + if (need_flush && (sent_pkts < cnt)) { + sent_pkts += port_->SendPackets(qid, &batch->pkts()[sent_pkts], cnt - sent_pkts); + } + } } - if (!(p->GetFlags() & DRIVER_FLAG_SELF_OUT_STATS)) { + if (!(port_->GetFlags() & DRIVER_FLAG_SELF_OUT_STATS)) { const packet_dir_t dir = PACKET_DIR_OUT; - - for (int i = 0; i < sent_pkts; i++) { - sent_bytes += batch->pkts()[i]->total_len(); - } - - p->queue_stats[dir][qid].packets += sent_pkts; - p->queue_stats[dir][qid].dropped += (batch->cnt() - sent_pkts); - p->queue_stats[dir][qid].bytes += sent_bytes; + port_->queue_stats[dir][qid].packets += sent_pkts - dropped; + port_->queue_stats[dir][qid].dropped += cnt - sent_pkts + dropped; + port_->queue_stats[dir][qid].bytes += sent_bytes; } - return sent_pkts; } void PortOut::ProcessBatch(Context *ctx, bess::PacketBatch *batch) { - Port *p = port_; CHECK(worker_queues_[ctx->wid] >= 0); queue_t qid = worker_queues_[ctx->wid]; int sent_pkts = 0; + if (queue_users_[qid] == 1) { - sent_pkts = SendBatch(batch, p, qid); + sent_pkts = SendBatch(batch, qid); } else { mcslock_node_t me; mcs_lock(&queue_locks_[qid], &me); - sent_pkts = SendBatch(batch, p, qid); + sent_pkts = SendBatch(batch, qid); mcs_unlock(&queue_locks_[qid], &me); } diff --git a/core/modules/port_out.h b/core/modules/port_out.h index 6017e7695..343e6f547 100644 --- a/core/modules/port_out.h +++ b/core/modules/port_out.h @@ -62,6 +62,8 @@ class PortOut final : public Module { private: Port *port_; + int attr_id_; + bool rpfcheck_; int worker_queues_[Worker::kMaxWorkers]; @@ -69,6 +71,8 @@ class PortOut final : public Module { int queue_users_[MAX_QUEUES_PER_DIR]; mcslock_t queue_locks_[MAX_QUEUES_PER_DIR]; + + int SendBatch(bess::PacketBatch *batch, queue_t qid); }; #endif // BESS_MODULES_PORTOUT_H_ diff --git a/core/packet.cc b/core/packet.cc index a3656b524..a56f85bb1 100644 --- a/core/packet.cc +++ b/core/packet.cc @@ -58,7 +58,9 @@ Packet *Packet::copy(const Packet *src) { bess::utils::CopyInlined(dst->append(src->total_len()), src->head_data(), src->total_len(), true); - + bess::utils::CopyInlined(&dst->metadata_, &src->metadata_, + SNBUF_METADATA, true); + return dst; } diff --git a/core/port.cc b/core/port.cc index 347ebf81f..c832ff92a 100644 --- a/core/port.cc +++ b/core/port.cc @@ -45,9 +45,31 @@ std::map PortBuilder::all_ports_; +static uint64_t cur_ifIndex = 1; +static bool rolled_over = false; + Port *PortBuilder::CreatePort(const std::string &name) const { Port *p = port_generator_(); p->set_name(name); + if (rolled_over) { + for (p->ifIndex = 1; p->ifIndex < UINT64_MAX; p->ifIndex++) { + bool found = false; + for (auto const &pi : all_ports_) { + if (pi.second->ifIndex == p->ifIndex) { + found = true; + break; + } + } + if (!found) { + break; + } + } + } else { + p->ifIndex = ++cur_ifIndex; + if (cur_ifIndex == UINT64_MAX) { + rolled_over = true; + } + } p->set_port_builder(this); return p; } diff --git a/core/port.h b/core/port.h index 193d13105..2bf900c9d 100644 --- a/core/port.h +++ b/core/port.h @@ -293,6 +293,8 @@ class Port { const PortBuilder *port_builder() const { return port_builder_; } + uint64_t get_ifIndex() const { return ifIndex; } + protected: friend class PortBuilder; @@ -301,6 +303,7 @@ class Port { // Current configuration Conf conf_; + uint64_t ifIndex; private: static const size_t kDefaultIncQueueSize = 1024; diff --git a/protobuf/module_msg.proto b/protobuf/module_msg.proto index e00a463a3..73518e626 100644 --- a/protobuf/module_msg.proto +++ b/protobuf/module_msg.proto @@ -770,6 +770,7 @@ message PortIncArg { */ message PortOutArg { string port = 1; /// The portname to connect to. + bool rpfcheck = 2; /// Allow reflection of packets to source port } /**