diff options
Diffstat (limited to 'net')
-rw-r--r-- | net/bpf/test_run.c | 54 | ||||
-rw-r--r-- | net/core/dev.c | 19 | ||||
-rw-r--r-- | net/core/filter.c | 71 | ||||
-rw-r--r-- | net/xdp/xsk.c | 79 | ||||
-rw-r--r-- | net/xdp/xsk_queue.c | 15 | ||||
-rw-r--r-- | net/xdp/xsk_queue.h | 371 |
6 files changed, 318 insertions, 291 deletions
diff --git a/net/bpf/test_run.c b/net/bpf/test_run.c index f79205d4444f..d555c0d8657d 100644 --- a/net/bpf/test_run.c +++ b/net/bpf/test_run.c @@ -15,7 +15,7 @@ #include <trace/events/bpf_test_run.h> static int bpf_test_run(struct bpf_prog *prog, void *ctx, u32 repeat, - u32 *retval, u32 *time) + u32 *retval, u32 *time, bool xdp) { struct bpf_cgroup_storage *storage[MAX_BPF_CGROUP_STORAGE_TYPE] = { NULL }; enum bpf_cgroup_storage_type stype; @@ -41,7 +41,11 @@ static int bpf_test_run(struct bpf_prog *prog, void *ctx, u32 repeat, time_start = ktime_get_ns(); for (i = 0; i < repeat; i++) { bpf_cgroup_storage_set(storage); - *retval = BPF_PROG_RUN(prog, ctx); + + if (xdp) + *retval = bpf_prog_run_xdp(prog, ctx); + else + *retval = BPF_PROG_RUN(prog, ctx); if (signal_pending(current)) { ret = -EINTR; @@ -247,34 +251,53 @@ static int convert___skb_to_skb(struct sk_buff *skb, struct __sk_buff *__skb) return 0; /* make sure the fields we don't use are zeroed */ - if (!range_is_zero(__skb, 0, offsetof(struct __sk_buff, priority))) + if (!range_is_zero(__skb, 0, offsetof(struct __sk_buff, mark))) + return -EINVAL; + + /* mark is allowed */ + + if (!range_is_zero(__skb, offsetofend(struct __sk_buff, mark), + offsetof(struct __sk_buff, priority))) return -EINVAL; /* priority is allowed */ - if (!range_is_zero(__skb, offsetof(struct __sk_buff, priority) + - sizeof_field(struct __sk_buff, priority), + if (!range_is_zero(__skb, offsetofend(struct __sk_buff, priority), offsetof(struct __sk_buff, cb))) return -EINVAL; /* cb is allowed */ - if (!range_is_zero(__skb, offsetof(struct __sk_buff, cb) + - sizeof_field(struct __sk_buff, cb), + if (!range_is_zero(__skb, offsetofend(struct __sk_buff, cb), offsetof(struct __sk_buff, tstamp))) return -EINVAL; /* tstamp is allowed */ + /* wire_len is allowed */ + /* gso_segs is allowed */ - if (!range_is_zero(__skb, offsetof(struct __sk_buff, tstamp) + - sizeof_field(struct __sk_buff, tstamp), + if (!range_is_zero(__skb, offsetofend(struct __sk_buff, gso_segs), sizeof(struct __sk_buff))) return -EINVAL; + skb->mark = __skb->mark; skb->priority = __skb->priority; skb->tstamp = __skb->tstamp; memcpy(&cb->data, __skb->cb, QDISC_CB_PRIV_LEN); + if (__skb->wire_len == 0) { + cb->pkt_len = skb->len; + } else { + if (__skb->wire_len < skb->len || + __skb->wire_len > GSO_MAX_SIZE) + return -EINVAL; + cb->pkt_len = __skb->wire_len; + } + + if (__skb->gso_segs > GSO_MAX_SEGS) + return -EINVAL; + skb_shinfo(skb)->gso_segs = __skb->gso_segs; + return 0; } @@ -285,9 +308,12 @@ static void convert_skb_to___skb(struct sk_buff *skb, struct __sk_buff *__skb) if (!__skb) return; + __skb->mark = skb->mark; __skb->priority = skb->priority; __skb->tstamp = skb->tstamp; memcpy(__skb->cb, &cb->data, QDISC_CB_PRIV_LEN); + __skb->wire_len = cb->pkt_len; + __skb->gso_segs = skb_shinfo(skb)->gso_segs; } int bpf_prog_test_run_skb(struct bpf_prog *prog, const union bpf_attr *kattr, @@ -359,7 +385,7 @@ int bpf_prog_test_run_skb(struct bpf_prog *prog, const union bpf_attr *kattr, ret = convert___skb_to_skb(skb, ctx); if (ret) goto out; - ret = bpf_test_run(prog, skb, repeat, &retval, &duration); + ret = bpf_test_run(prog, skb, repeat, &retval, &duration, false); if (ret) goto out; if (!is_l2) { @@ -416,8 +442,8 @@ int bpf_prog_test_run_xdp(struct bpf_prog *prog, const union bpf_attr *kattr, rxqueue = __netif_get_rx_queue(current->nsproxy->net_ns->loopback_dev, 0); xdp.rxq = &rxqueue->xdp_rxq; - - ret = bpf_test_run(prog, &xdp, repeat, &retval, &duration); + bpf_prog_change_xdp(NULL, prog); + ret = bpf_test_run(prog, &xdp, repeat, &retval, &duration, true); if (ret) goto out; if (xdp.data != data + XDP_PACKET_HEADROOM + NET_IP_ALIGN || @@ -425,6 +451,7 @@ int bpf_prog_test_run_xdp(struct bpf_prog *prog, const union bpf_attr *kattr, size = xdp.data_end - xdp.data; ret = bpf_test_finish(kattr, uattr, xdp.data, size, retval, duration); out: + bpf_prog_change_xdp(prog, NULL); kfree(data); return ret; } @@ -437,8 +464,7 @@ static int verify_user_bpf_flow_keys(struct bpf_flow_keys *ctx) /* flags is allowed */ - if (!range_is_zero(ctx, offsetof(struct bpf_flow_keys, flags) + - sizeof_field(struct bpf_flow_keys, flags), + if (!range_is_zero(ctx, offsetofend(struct bpf_flow_keys, flags), sizeof(struct bpf_flow_keys))) return -EINVAL; diff --git a/net/core/dev.c b/net/core/dev.c index 0ad39c87b7fd..f1b8afcfbc0f 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -8542,7 +8542,17 @@ static int dev_xdp_install(struct net_device *dev, bpf_op_t bpf_op, struct netlink_ext_ack *extack, u32 flags, struct bpf_prog *prog) { + bool non_hw = !(flags & XDP_FLAGS_HW_MODE); + struct bpf_prog *prev_prog = NULL; struct netdev_bpf xdp; + int err; + + if (non_hw) { + prev_prog = bpf_prog_by_id(__dev_xdp_query(dev, bpf_op, + XDP_QUERY_PROG)); + if (IS_ERR(prev_prog)) + prev_prog = NULL; + } memset(&xdp, 0, sizeof(xdp)); if (flags & XDP_FLAGS_HW_MODE) @@ -8553,7 +8563,14 @@ static int dev_xdp_install(struct net_device *dev, bpf_op_t bpf_op, xdp.flags = flags; xdp.prog = prog; - return bpf_op(dev, &xdp); + err = bpf_op(dev, &xdp); + if (!err && non_hw) + bpf_prog_change_xdp(prev_prog, prog); + + if (prev_prog) + bpf_prog_put(prev_prog); + + return err; } static void dev_xdp_uninstall(struct net_device *dev) diff --git a/net/core/filter.c b/net/core/filter.c index 28b3c258188c..42fd17c48c5f 100644 --- a/net/core/filter.c +++ b/net/core/filter.c @@ -3511,36 +3511,16 @@ err: } static int __bpf_tx_xdp_map(struct net_device *dev_rx, void *fwd, - struct bpf_map *map, - struct xdp_buff *xdp, - u32 index) + struct bpf_map *map, struct xdp_buff *xdp) { - int err; - switch (map->map_type) { case BPF_MAP_TYPE_DEVMAP: - case BPF_MAP_TYPE_DEVMAP_HASH: { - struct bpf_dtab_netdev *dst = fwd; - - err = dev_map_enqueue(dst, xdp, dev_rx); - if (unlikely(err)) - return err; - break; - } - case BPF_MAP_TYPE_CPUMAP: { - struct bpf_cpu_map_entry *rcpu = fwd; - - err = cpu_map_enqueue(rcpu, xdp, dev_rx); - if (unlikely(err)) - return err; - break; - } - case BPF_MAP_TYPE_XSKMAP: { - struct xdp_sock *xs = fwd; - - err = __xsk_map_redirect(map, xdp, xs); - return err; - } + case BPF_MAP_TYPE_DEVMAP_HASH: + return dev_map_enqueue(fwd, xdp, dev_rx); + case BPF_MAP_TYPE_CPUMAP: + return cpu_map_enqueue(fwd, xdp, dev_rx); + case BPF_MAP_TYPE_XSKMAP: + return __xsk_map_redirect(fwd, xdp); default: break; } @@ -3549,26 +3529,9 @@ static int __bpf_tx_xdp_map(struct net_device *dev_rx, void *fwd, void xdp_do_flush_map(void) { - struct bpf_redirect_info *ri = this_cpu_ptr(&bpf_redirect_info); - struct bpf_map *map = ri->map_to_flush; - - ri->map_to_flush = NULL; - if (map) { - switch (map->map_type) { - case BPF_MAP_TYPE_DEVMAP: - case BPF_MAP_TYPE_DEVMAP_HASH: - __dev_map_flush(map); - break; - case BPF_MAP_TYPE_CPUMAP: - __cpu_map_flush(map); - break; - case BPF_MAP_TYPE_XSKMAP: - __xsk_map_flush(map); - break; - default: - break; - } - } + __dev_map_flush(); + __cpu_map_flush(); + __xsk_map_flush(); } EXPORT_SYMBOL_GPL(xdp_do_flush_map); @@ -3617,14 +3580,10 @@ static int xdp_do_redirect_map(struct net_device *dev, struct xdp_buff *xdp, ri->tgt_value = NULL; WRITE_ONCE(ri->map, NULL); - if (ri->map_to_flush && unlikely(ri->map_to_flush != map)) - xdp_do_flush_map(); - - err = __bpf_tx_xdp_map(dev, fwd, map, xdp, index); + err = __bpf_tx_xdp_map(dev, fwd, map, xdp); if (unlikely(err)) goto err; - ri->map_to_flush = map; _trace_xdp_redirect_map(dev, xdp_prog, fwd, map, index); return 0; err: @@ -8941,3 +8900,11 @@ const struct bpf_verifier_ops sk_reuseport_verifier_ops = { const struct bpf_prog_ops sk_reuseport_prog_ops = { }; #endif /* CONFIG_INET */ + +DEFINE_BPF_DISPATCHER(bpf_dispatcher_xdp) + +void bpf_prog_change_xdp(struct bpf_prog *prev_prog, struct bpf_prog *prog) +{ + bpf_dispatcher_change_prog(BPF_DISPATCHER_PTR(bpf_dispatcher_xdp), + prev_prog, prog); +} diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c index 328f661b83b2..02ada7ab8c6e 100644 --- a/net/xdp/xsk.c +++ b/net/xdp/xsk.c @@ -31,6 +31,8 @@ #define TX_BATCH_SIZE 16 +static DEFINE_PER_CPU(struct list_head, xskmap_flush_list); + bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs) { return READ_ONCE(xs->rx) && READ_ONCE(xs->umem) && @@ -39,21 +41,21 @@ bool xsk_is_setup_for_bpf_map(struct xdp_sock *xs) bool xsk_umem_has_addrs(struct xdp_umem *umem, u32 cnt) { - return xskq_has_addrs(umem->fq, cnt); + return xskq_cons_has_entries(umem->fq, cnt); } EXPORT_SYMBOL(xsk_umem_has_addrs); -u64 *xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr) +bool xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr) { - return xskq_peek_addr(umem->fq, addr, umem); + return xskq_cons_peek_addr(umem->fq, addr, umem); } EXPORT_SYMBOL(xsk_umem_peek_addr); -void xsk_umem_discard_addr(struct xdp_umem *umem) +void xsk_umem_release_addr(struct xdp_umem *umem) { - xskq_discard_addr(umem->fq); + xskq_cons_release(umem->fq); } -EXPORT_SYMBOL(xsk_umem_discard_addr); +EXPORT_SYMBOL(xsk_umem_release_addr); void xsk_set_rx_need_wakeup(struct xdp_umem *umem) { @@ -124,7 +126,7 @@ static void __xsk_rcv_memcpy(struct xdp_umem *umem, u64 addr, void *from_buf, void *to_buf = xdp_umem_get_data(umem, addr); addr = xsk_umem_add_offset_to_addr(addr); - if (xskq_crosses_non_contig_pg(umem, addr, len + metalen)) { + if (xskq_cons_crosses_non_contig_pg(umem, addr, len + metalen)) { void *next_pg_addr = umem->pages[(addr >> PAGE_SHIFT) + 1].addr; u64 page_start = addr & ~(PAGE_SIZE - 1); u64 first_len = PAGE_SIZE - (addr - page_start); @@ -146,7 +148,7 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) u32 metalen; int err; - if (!xskq_peek_addr(xs->umem->fq, &addr, xs->umem) || + if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) || len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) { xs->rx_dropped++; return -ENOSPC; @@ -165,9 +167,9 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) offset += metalen; addr = xsk_umem_adjust_offset(xs->umem, addr, offset); - err = xskq_produce_batch_desc(xs->rx, addr, len); + err = xskq_prod_reserve_desc(xs->rx, addr, len); if (!err) { - xskq_discard_addr(xs->umem->fq); + xskq_cons_release(xs->umem->fq); xdp_return_buff(xdp); return 0; } @@ -178,7 +180,7 @@ static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len) { - int err = xskq_produce_batch_desc(xs->rx, (u64)xdp->handle, len); + int err = xskq_prod_reserve_desc(xs->rx, xdp->handle, len); if (err) xs->rx_dropped++; @@ -214,7 +216,7 @@ static int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) static void xsk_flush(struct xdp_sock *xs) { - xskq_produce_flush_desc(xs->rx); + xskq_prod_submit(xs->rx); xs->sk.sk_data_ready(&xs->sk); } @@ -234,7 +236,7 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) goto out_unlock; } - if (!xskq_peek_addr(xs->umem->fq, &addr, xs->umem) || + if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) || len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) { err = -ENOSPC; goto out_drop; @@ -245,12 +247,12 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp) memcpy(buffer, xdp->data_meta, len + metalen); addr = xsk_umem_adjust_offset(xs->umem, addr, metalen); - err = xskq_produce_batch_desc(xs->rx, addr, len); + err = xskq_prod_reserve_desc(xs->rx, addr, len); if (err) goto out_drop; - xskq_discard_addr(xs->umem->fq); - xskq_produce_flush_desc(xs->rx); + xskq_cons_release(xs->umem->fq); + xskq_prod_submit(xs->rx); spin_unlock_bh(&xs->rx_lock); @@ -264,11 +266,9 @@ out_unlock: return err; } -int __xsk_map_redirect(struct bpf_map *map, struct xdp_buff *xdp, - struct xdp_sock *xs) +int __xsk_map_redirect(struct xdp_sock *xs, struct xdp_buff *xdp) { - struct xsk_map *m = container_of(map, struct xsk_map, map); - struct list_head *flush_list = this_cpu_ptr(m->flush_list); + struct list_head *flush_list = this_cpu_ptr(&xskmap_flush_list); int err; err = xsk_rcv(xs, xdp); @@ -281,10 +281,9 @@ int __xsk_map_redirect(struct bpf_map *map, struct xdp_buff *xdp, return 0; } -void __xsk_map_flush(struct bpf_map *map) +void __xsk_map_flush(void) { - struct xsk_map *m = container_of(map, struct xsk_map, map); - struct list_head *flush_list = this_cpu_ptr(m->flush_list); + struct list_head *flush_list = this_cpu_ptr(&xskmap_flush_list); struct xdp_sock *xs, *tmp; list_for_each_entry_safe(xs, tmp, flush_list, flush_node) { @@ -295,7 +294,7 @@ void __xsk_map_flush(struct bpf_map *map) void xsk_umem_complete_tx(struct xdp_umem *umem, u32 nb_entries) { - xskq_produce_flush_addr_n(umem->cq, nb_entries); + xskq_prod_submit_n(umem->cq, nb_entries); } EXPORT_SYMBOL(xsk_umem_complete_tx); @@ -317,13 +316,18 @@ bool xsk_umem_consume_tx(struct xdp_umem *umem, struct xdp_desc *desc) rcu_read_lock(); list_for_each_entry_rcu(xs, &umem->xsk_list, list) { - if (!xskq_peek_desc(xs->tx, desc, umem)) + if (!xskq_cons_peek_desc(xs->tx, desc, umem)) continue; - if (xskq_produce_addr_lazy(umem->cq, desc->addr)) + /* This is the backpreassure mechanism for the Tx path. + * Reserve space in the completion queue and only proceed + * if there is space in it. This avoids having to implement + * any buffering in the Tx path. + */ + if (xskq_prod_reserve_addr(umem->cq, desc->addr)) goto out; - xskq_discard_desc(xs->tx); + xskq_cons_release(xs->tx); rcu_read_unlock(); return true; } @@ -358,7 +362,7 @@ static void xsk_destruct_skb(struct sk_buff *skb) unsigned long flags; spin_lock_irqsave(&xs->tx_completion_lock, flags); - WARN_ON_ONCE(xskq_produce_addr(xs->umem->cq, addr)); + xskq_prod_submit_addr(xs->umem->cq, addr); spin_unlock_irqrestore(&xs->tx_completion_lock, flags); sock_wfree(skb); @@ -378,7 +382,7 @@ static int xsk_generic_xmit(struct sock *sk) if (xs->queue_id >= xs->dev->real_num_tx_queues) goto out; - while (xskq_peek_desc(xs->tx, &desc, xs->umem)) { + while (xskq_cons_peek_desc(xs->tx, &desc, xs->umem)) { char *buffer; u64 addr; u32 len; @@ -399,7 +403,12 @@ static int xsk_generic_xmit(struct sock *sk) addr = desc.addr; buffer = xdp_umem_get_data(xs->umem, addr); err = skb_store_bits(skb, 0, buffer, len); - if (unlikely(err) || xskq_reserve_addr(xs->umem->cq)) { + /* This is the backpreassure mechanism for the Tx path. + * Reserve space in the completion queue and only proceed + * if there is space in it. This avoids having to implement + * any buffering in the Tx path. + */ + if (unlikely(err) || xskq_prod_reserve(xs->umem->cq)) { kfree_skb(skb); goto out; } @@ -411,7 +420,7 @@ static int xsk_generic_xmit(struct sock *sk) skb->destructor = xsk_destruct_skb; err = dev_direct_xmit(skb, xs->queue_id); - xskq_discard_desc(xs->tx); + xskq_cons_release(xs->tx); /* Ignore NET_XMIT_CN as packet might have been sent */ if (err == NET_XMIT_DROP || err == NETDEV_TX_BUSY) { /* SKB completed but not sent */ @@ -477,9 +486,9 @@ static __poll_t xsk_poll(struct file *file, struct socket *sock, __xsk_sendmsg(sk); } - if (xs->rx && !xskq_empty_desc(xs->rx)) + if (xs->rx && !xskq_prod_is_empty(xs->rx)) mask |= EPOLLIN | EPOLLRDNORM; - if (xs->tx && !xskq_full_desc(xs->tx)) + if (xs->tx && !xskq_cons_is_full(xs->tx)) mask |= EPOLLOUT | EPOLLWRNORM; return mask; @@ -1183,7 +1192,7 @@ static struct pernet_operations xsk_net_ops = { static int __init xsk_init(void) { - int err; + int err, cpu; err = proto_register(&xsk_proto, 0 /* no slab */); if (err) @@ -1201,6 +1210,8 @@ static int __init xsk_init(void) if (err) goto out_pernet; + for_each_possible_cpu(cpu) + INIT_LIST_HEAD(&per_cpu(xskmap_flush_list, cpu)); return 0; out_pernet: diff --git a/net/xdp/xsk_queue.c b/net/xdp/xsk_queue.c index b66504592d9b..c90e9c1e3c63 100644 --- a/net/xdp/xsk_queue.c +++ b/net/xdp/xsk_queue.c @@ -18,14 +18,14 @@ void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask) q->chunk_mask = chunk_mask; } -static u32 xskq_umem_get_ring_size(struct xsk_queue *q) +static size_t xskq_get_ring_size(struct xsk_queue *q, bool umem_queue) { - return sizeof(struct xdp_umem_ring) + q->nentries * sizeof(u64); -} + struct xdp_umem_ring *umem_ring; + struct xdp_rxtx_ring *rxtx_ring; -static u32 xskq_rxtx_get_ring_size(struct xsk_queue *q) -{ - return sizeof(struct xdp_ring) + q->nentries * sizeof(struct xdp_desc); + if (umem_queue) + return struct_size(umem_ring, desc, q->nentries); + return struct_size(rxtx_ring, desc, q->nentries); } struct xsk_queue *xskq_create(u32 nentries, bool umem_queue) @@ -43,8 +43,7 @@ struct xsk_queue *xskq_create(u32 nentries, bool umem_queue) gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP | __GFP_NORETRY; - size = umem_queue ? xskq_umem_get_ring_size(q) : - xskq_rxtx_get_ring_size(q); + size = xskq_get_ring_size(q, umem_queue); q->ring = (struct xdp_ring *)__get_free_pages(gfp_flags, get_order(size)); diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index eddae4688862..bec2af11853a 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -10,9 +10,6 @@ #include <linux/if_xdp.h> #include <net/xdp_sock.h> -#define RX_BATCH_SIZE 16 -#define LAZY_UPDATE_THRESHOLD 128 - struct xdp_ring { u32 producer ____cacheline_aligned_in_smp; u32 consumer ____cacheline_aligned_in_smp; @@ -36,10 +33,8 @@ struct xsk_queue { u64 size; u32 ring_mask; u32 nentries; - u32 prod_head; - u32 prod_tail; - u32 cons_head; - u32 cons_tail; + u32 cached_prod; + u32 cached_cons; struct xdp_ring *ring; u64 invalid_descs; }; @@ -86,56 +81,31 @@ struct xsk_queue { * now and again after circling through the ring. */ -/* Common functions operating for both RXTX and umem queues */ - -static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) -{ - return q ? q->invalid_descs : 0; -} - -static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt) -{ - u32 entries = q->prod_tail - q->cons_tail; - - if (entries == 0) { - /* Refresh the local pointer */ - q->prod_tail = READ_ONCE(q->ring->producer); - entries = q->prod_tail - q->cons_tail; - } - - return (entries > dcnt) ? dcnt : entries; -} - -static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt) -{ - u32 free_entries = q->nentries - (producer - q->cons_tail); - - if (free_entries >= dcnt) - return free_entries; - - /* Refresh the local tail pointer */ - q->cons_tail = READ_ONCE(q->ring->consumer); - return q->nentries - (producer - q->cons_tail); -} - -static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt) -{ - u32 entries = q->prod_tail - q->cons_tail; - - if (entries >= cnt) - return true; - - /* Refresh the local pointer. */ - q->prod_tail = READ_ONCE(q->ring->producer); - entries = q->prod_tail - q->cons_tail; - - return entries >= cnt; -} +/* The operations on the rings are the following: + * + * producer consumer + * + * RESERVE entries PEEK in the ring for entries + * WRITE data into the ring READ data from the ring + * SUBMIT entries RELEASE entries + * + * The producer reserves one or more entries in the ring. It can then + * fill in these entries and finally submit them so that they can be + * seen and read by the consumer. + * + * The consumer peeks into the ring to see if the producer has written + * any new entries. If so, the producer can then read these entries + * and when it is done reading them release them back to the producer + * so that the producer can use these slots to fill in new entries. + * + * The function names below reflect these operations. + */ -/* UMEM queue */ +/* Functions that read and validate content from consumer rings. */ -static inline bool xskq_crosses_non_contig_pg(struct xdp_umem *umem, u64 addr, - u64 length) +static inline bool xskq_cons_crosses_non_contig_pg(struct xdp_umem *umem, + u64 addr, + u64 length) { bool cross_pg = (addr & (PAGE_SIZE - 1)) + length > PAGE_SIZE; bool next_pg_contig = @@ -145,9 +115,16 @@ static inline bool xskq_crosses_non_contig_pg(struct xdp_umem *umem, u64 addr, return cross_pg && !next_pg_contig; } -static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) +static inline bool xskq_cons_is_valid_unaligned(struct xsk_queue *q, + u64 addr, + u64 length, + struct xdp_umem *umem) { - if (addr >= q->size) { + u64 base_addr = xsk_umem_extract_addr(addr); + + addr = xsk_umem_add_offset_to_addr(addr); + if (base_addr >= q->size || addr >= q->size || + xskq_cons_crosses_non_contig_pg(umem, addr, length)) { q->invalid_descs++; return false; } @@ -155,15 +132,9 @@ static inline bool xskq_is_valid_addr(struct xsk_queue *q, u64 addr) return true; } -static inline bool xskq_is_valid_addr_unaligned(struct xsk_queue *q, u64 addr, - u64 length, - struct xdp_umem *umem) +static inline bool xskq_cons_is_valid_addr(struct xsk_queue *q, u64 addr) { - u64 base_addr = xsk_umem_extract_addr(addr); - - addr = xsk_umem_add_offset_to_addr(addr); - if (base_addr >= q->size || addr >= q->size || - xskq_crosses_non_contig_pg(umem, addr, length)) { + if (addr >= q->size) { q->invalid_descs++; return false; } @@ -171,204 +142,240 @@ static inline bool xskq_is_valid_addr_unaligned(struct xsk_queue *q, u64 addr, return true; } -static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr, - struct xdp_umem *umem) +static inline bool xskq_cons_read_addr(struct xsk_queue *q, u64 *addr, + struct xdp_umem *umem) { - while (q->cons_tail != q->cons_head) { - struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - unsigned int idx = q->cons_tail & q->ring_mask; + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - *addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask; + while (q->cached_cons != q->cached_prod) { + u32 idx = q->cached_cons & q->ring_mask; + + *addr = ring->desc[idx] & q->chunk_mask; if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) { - if (xskq_is_valid_addr_unaligned(q, *addr, + if (xskq_cons_is_valid_unaligned(q, *addr, umem->chunk_size_nohr, umem)) - return addr; + return true; goto out; } - if (xskq_is_valid_addr(q, *addr)) - return addr; + if (xskq_cons_is_valid_addr(q, *addr)) + return true; out: - q->cons_tail++; + q->cached_cons++; } - return NULL; + return false; } -static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr, - struct xdp_umem *umem) +static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q, + struct xdp_desc *d, + struct xdp_umem *umem) { - if (q->cons_tail == q->cons_head) { - smp_mb(); /* D, matches A */ - WRITE_ONCE(q->ring->consumer, q->cons_tail); - q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); + if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) { + if (!xskq_cons_is_valid_unaligned(q, d->addr, d->len, umem)) + return false; + + if (d->len > umem->chunk_size_nohr || d->options) { + q->invalid_descs++; + return false; + } - /* Order consumer and data */ - smp_rmb(); + return true; } - return xskq_validate_addr(q, addr, umem); -} + if (!xskq_cons_is_valid_addr(q, d->addr)) + return false; -static inline void xskq_discard_addr(struct xsk_queue *q) -{ - q->cons_tail++; + if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) || + d->options) { + q->invalid_descs++; + return false; + } + + return true; } -static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) +static inline bool xskq_cons_read_desc(struct xsk_queue *q, + struct xdp_desc *desc, + struct xdp_umem *umem) { - struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - - if (xskq_nb_free(q, q->prod_tail, 1) == 0) - return -ENOSPC; + while (q->cached_cons != q->cached_prod) { + struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; + u32 idx = q->cached_cons & q->ring_mask; - /* A, matches D */ - ring->desc[q->prod_tail++ & q->ring_mask] = addr; + *desc = ring->desc[idx]; + if (xskq_cons_is_valid_desc(q, desc, umem)) + return true; - /* Order producer and data */ - smp_wmb(); /* B, matches C */ + q->cached_cons++; + } - WRITE_ONCE(q->ring->producer, q->prod_tail); - return 0; + return false; } -static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) +/* Functions for consumers */ + +static inline void __xskq_cons_release(struct xsk_queue *q) { - struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; + smp_mb(); /* D, matches A */ + WRITE_ONCE(q->ring->consumer, q->cached_cons); +} - if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) - return -ENOSPC; +static inline void __xskq_cons_peek(struct xsk_queue *q) +{ + /* Refresh the local pointer */ + q->cached_prod = READ_ONCE(q->ring->producer); + smp_rmb(); /* C, matches B */ +} - /* A, matches D */ - ring->desc[q->prod_head++ & q->ring_mask] = addr; - return 0; +static inline void xskq_cons_get_entries(struct xsk_queue *q) +{ + __xskq_cons_release(q); + __xskq_cons_peek(q); } -static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, - u32 nb_entries) +static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt) { - /* Order producer and data */ - smp_wmb(); /* B, matches C */ + u32 entries = q->cached_prod - q->cached_cons; - q->prod_tail += nb_entries; - WRITE_ONCE(q->ring->producer, q->prod_tail); + if (entries >= cnt) + return true; + + __xskq_cons_peek(q); + entries = q->cached_prod - q->cached_cons; + + return entries >= cnt; } -static inline int xskq_reserve_addr(struct xsk_queue *q) +static inline bool xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr, + struct xdp_umem *umem) { - if (xskq_nb_free(q, q->prod_head, 1) == 0) - return -ENOSPC; + if (q->cached_prod == q->cached_cons) + xskq_cons_get_entries(q); + return xskq_cons_read_addr(q, addr, umem); +} - /* A, matches D */ - q->prod_head++; - return 0; +static inline bool xskq_cons_peek_desc(struct xsk_queue *q, + struct xdp_desc *desc, + struct xdp_umem *umem) +{ + if (q->cached_prod == q->cached_cons) + xskq_cons_get_entries(q); + return xskq_cons_read_desc(q, desc, umem); } -/* Rx/Tx queue */ +static inline void xskq_cons_release(struct xsk_queue *q) +{ + /* To improve performance, only update local state here. + * Reflect this to global state when we get new entries + * from the ring in xskq_cons_get_entries(). + */ + q->cached_cons++; +} -static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d, - struct xdp_umem *umem) +static inline bool xskq_cons_is_full(struct xsk_queue *q) { - if (umem->flags & XDP_UMEM_UNALIGNED_CHUNK_FLAG) { - if (!xskq_is_valid_addr_unaligned(q, d->addr, d->len, umem)) - return false; + /* No barriers needed since data is not accessed */ + return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) == + q->nentries; +} - if (d->len > umem->chunk_size_nohr || d->options) { - q->invalid_descs++; - return false; - } +/* Functions for producers */ - return true; - } +static inline bool xskq_prod_is_full(struct xsk_queue *q) +{ + u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons); - if (!xskq_is_valid_addr(q, d->addr)) + if (free_entries) return false; - if (((d->addr + d->len) & q->chunk_mask) != (d->addr & q->chunk_mask) || - d->options) { - q->invalid_descs++; - return false; - } + /* Refresh the local tail pointer */ + q->cached_cons = READ_ONCE(q->ring->consumer); + free_entries = q->nentries - (q->cached_prod - q->cached_cons); - return true; + return !free_entries; } -static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q, - struct xdp_desc *desc, - struct xdp_umem *umem) +static inline int xskq_prod_reserve(struct xsk_queue *q) { - while (q->cons_tail != q->cons_head) { - struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; - unsigned int idx = q->cons_tail & q->ring_mask; - - *desc = READ_ONCE(ring->desc[idx]); - if (xskq_is_valid_desc(q, desc, umem)) - return desc; - - q->cons_tail++; - } + if (xskq_prod_is_full(q)) + return -ENOSPC; - return NULL; + /* A, matches D */ + q->cached_prod++; + return 0; } -static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, - struct xdp_desc *desc, - struct xdp_umem *umem) +static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr) { - if (q->cons_tail == q->cons_head) { - smp_mb(); /* D, matches A */ - WRITE_ONCE(q->ring->consumer, q->cons_tail); - q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); - - /* Order consumer and data */ - smp_rmb(); /* C, matches B */ - } + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; - return xskq_validate_desc(q, desc, umem); -} + if (xskq_prod_is_full(q)) + return -ENOSPC; -static inline void xskq_discard_desc(struct xsk_queue *q) -{ - q->cons_tail++; + /* A, matches D */ + ring->desc[q->cached_prod++ & q->ring_mask] = addr; + return 0; } -static inline int xskq_produce_batch_desc(struct xsk_queue *q, - u64 addr, u32 len) +static inline int xskq_prod_reserve_desc(struct xsk_queue *q, + u64 addr, u32 len) { struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring; - unsigned int idx; + u32 idx; - if (xskq_nb_free(q, q->prod_head, 1) == 0) + if (xskq_prod_is_full(q)) return -ENOSPC; /* A, matches D */ - idx = (q->prod_head++) & q->ring_mask; + idx = q->cached_prod++ & q->ring_mask; ring->desc[idx].addr = addr; ring->desc[idx].len = len; return 0; } -static inline void xskq_produce_flush_desc(struct xsk_queue *q) +static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx) { - /* Order producer and data */ smp_wmb(); /* B, matches C */ - q->prod_tail = q->prod_head; - WRITE_ONCE(q->ring->producer, q->prod_tail); + WRITE_ONCE(q->ring->producer, idx); +} + +static inline void xskq_prod_submit(struct xsk_queue *q) +{ + __xskq_prod_submit(q, q->cached_prod); +} + +static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr) +{ + struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring; + u32 idx = q->ring->producer; + + ring->desc[idx++ & q->ring_mask] = addr; + + __xskq_prod_submit(q, idx); } -static inline bool xskq_full_desc(struct xsk_queue *q) +static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries) { - return xskq_nb_avail(q, q->nentries) == q->nentries; + __xskq_prod_submit(q, q->ring->producer + nb_entries); } -static inline bool xskq_empty_desc(struct xsk_queue *q) +static inline bool xskq_prod_is_empty(struct xsk_queue *q) { - return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries; + /* No barriers needed since data is not accessed */ + return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer); +} + +/* For both producers and consumers */ + +static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) +{ + return q ? q->invalid_descs : 0; } void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask); |