Skip to content

Commit a07fe39

Browse files
committed
prov/efa: Clean up rxe map during rxe release
rxe can be inserted to rxe map when receiver gets a multi-req pkts (medium, runting). When the receive ep is closed early before the receive completes, the stale entry in the rxe map will cause an assertion error when destroying the rxe_map buffer pool during ep close. This patch fixes this issue by recording the rxe_map insertion status in rxe, and removing the rxe from the map during the rxe release. Signed-off-by: Shi Jin <[email protected]>
1 parent 103f94f commit a07fe39

File tree

11 files changed

+63
-22
lines changed

11 files changed

+63
-22
lines changed

prov/efa/src/rdm/efa_rdm_ep_utils.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ struct efa_rdm_ope *efa_rdm_ep_alloc_rxe(struct efa_rdm_ep *ep, fi_addr_t addr,
124124
rxe->op = op;
125125
rxe->peer_rxe = NULL;
126126
rxe->unexp_pkt = NULL;
127+
rxe->rxe_map = NULL;
127128
rxe->atomrsp_data = NULL;
128129
rxe->bytes_read_total_len = 0;
129130

prov/efa/src/rdm/efa_rdm_msg.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,7 @@ struct efa_rdm_ope *efa_rdm_msg_alloc_rxe_for_msgrtm(struct efa_rdm_ep *ep,
797797

798798
pkt_type = efa_rdm_pke_get_base_hdr(*pkt_entry_ptr)->type;
799799
if (efa_rdm_pkt_type_is_mulreq(pkt_type))
800-
efa_rdm_rxe_map_insert(&ep->rxe_map, *pkt_entry_ptr, rxe);
800+
efa_rdm_rxe_map_insert(&ep->rxe_map, efa_rdm_pke_get_rtm_msg_id(*pkt_entry_ptr), (*pkt_entry_ptr)->addr, rxe);
801801

802802
return rxe;
803803
}
@@ -874,7 +874,7 @@ struct efa_rdm_ope *efa_rdm_msg_alloc_rxe_for_tagrtm(struct efa_rdm_ep *ep,
874874

875875
pkt_type = efa_rdm_pke_get_base_hdr(*pkt_entry_ptr)->type;
876876
if (efa_rdm_pkt_type_is_mulreq(pkt_type))
877-
efa_rdm_rxe_map_insert(&ep->rxe_map, *pkt_entry_ptr, rxe);
877+
efa_rdm_rxe_map_insert(&ep->rxe_map, efa_rdm_pke_get_rtm_msg_id(*pkt_entry_ptr), (*pkt_entry_ptr)->addr, rxe);
878878

879879
return rxe;
880880
}

prov/efa/src/rdm/efa_rdm_ope.c

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe)
167167

168168
dlist_remove(&rxe->ep_entry);
169169

170+
if (rxe->rxe_map)
171+
efa_rdm_rxe_map_remove(rxe->rxe_map, rxe->msg_id, rxe->addr, rxe);
172+
170173
for (i = 0; i < rxe->iov_count; i++) {
171174
if (rxe->mr[i]) {
172175
err = fi_close((struct fid *)rxe->mr[i]);
@@ -1096,12 +1099,6 @@ void efa_rdm_ope_handle_recv_completed(struct efa_rdm_ope *ope)
10961099
efa_rdm_rxe_report_completion(rxe);
10971100
}
10981101

1099-
if (ope->internal_flags & EFA_RDM_OPE_READ_NACK) {
1100-
assert(ope->type == EFA_RDM_RXE);
1101-
/* Apply to both DC and non-DC */
1102-
efa_rdm_rxe_map_remove(&ope->ep->rxe_map, ope->msg_id, ope->peer->efa_fiaddr, ope);
1103-
}
1104-
11051102
/* As can be seen, this function does not release rxe when
11061103
* efa_rdm_ope_post_send_or_queue() was successful.
11071104
*

prov/efa/src/rdm/efa_rdm_ope.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ struct efa_rdm_ope {
153153
struct efa_rdm_pke *unexp_pkt;
154154
char *atomrsp_data;
155155
enum efa_rdm_cuda_copy_method cuda_copy_method;
156+
/* the rxe_map that the rxe is ever inserted */
157+
struct efa_rdm_rxe_map *rxe_map;
156158
/* end of RX related variables */
157159
/* the following variables are for TX operation only */
158160
uint64_t bytes_acked;

prov/efa/src/rdm/efa_rdm_pke_rtm.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ ssize_t efa_rdm_pke_proc_msgrtm(struct efa_rdm_pke *pkt_entry)
281281

282282
rtm_hdr = (struct efa_rdm_rtm_base_hdr *)pkt_entry->wiredata;
283283
if (rtm_hdr->flags & EFA_RDM_REQ_READ_NACK) {
284-
rxe = efa_rdm_rxe_map_lookup(&ep->rxe_map, pkt_entry);
284+
rxe = efa_rdm_rxe_map_lookup(&ep->rxe_map, efa_rdm_pke_get_rtm_msg_id(pkt_entry), pkt_entry->addr);
285285
rxe->internal_flags |= EFA_RDM_OPE_READ_NACK;
286286
} else {
287287
rxe = efa_rdm_msg_alloc_rxe_for_msgrtm(ep, &pkt_entry);
@@ -329,7 +329,7 @@ ssize_t efa_rdm_pke_proc_tagrtm(struct efa_rdm_pke *pkt_entry)
329329

330330
rtm_hdr = (struct efa_rdm_rtm_base_hdr *) pkt_entry->wiredata;
331331
if (rtm_hdr->flags & EFA_RDM_REQ_READ_NACK) {
332-
rxe = efa_rdm_rxe_map_lookup(&ep->rxe_map, pkt_entry);
332+
rxe = efa_rdm_rxe_map_lookup(&ep->rxe_map, efa_rdm_pke_get_rtm_msg_id(pkt_entry), pkt_entry->addr);
333333
rxe->internal_flags |= EFA_RDM_OPE_READ_NACK;
334334
} else {
335335
rxe = efa_rdm_msg_alloc_rxe_for_tagrtm(ep, &pkt_entry);
@@ -446,7 +446,7 @@ void efa_rdm_pke_handle_rtm_rta_recv(struct efa_rdm_pke *pkt_entry)
446446
struct efa_rdm_ope *rxe;
447447
struct efa_rdm_pke *unexp_pkt_entry;
448448

449-
rxe = efa_rdm_rxe_map_lookup(&pkt_entry->ep->rxe_map, pkt_entry);
449+
rxe = efa_rdm_rxe_map_lookup(&ep->rxe_map, efa_rdm_pke_get_rtm_msg_id(pkt_entry), pkt_entry->addr);
450450
if (rxe) {
451451
if (rxe->state == EFA_RDM_RXE_MATCHED) {
452452
pkt_entry->ope = rxe;

prov/efa/src/rdm/efa_rdm_pke_utils.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "efa_rdm_pke.h"
88
#include "efa_rdm_protocol.h"
99
#include "efa_rdm_pkt_type.h"
10+
#include "efa_rdm_pke_rtm.h"
1011
#include "efa_mr.h"
1112

1213
/**
@@ -160,7 +161,7 @@ efa_rdm_pke_post_remote_read_or_nack(struct efa_rdm_ep *ep,
160161
}
161162

162163
if (efa_rdm_pkt_type_is_rtm(pkt_type)) {
163-
efa_rdm_rxe_map_insert(&ep->rxe_map, pkt_entry, rxe);
164+
efa_rdm_rxe_map_insert(&ep->rxe_map, efa_rdm_pke_get_rtm_msg_id(pkt_entry), pkt_entry->addr, rxe);
164165
}
165166

166167
return efa_rdm_ope_post_send_or_queue(rxe, EFA_RDM_READ_NACK_PKT);

prov/efa/src/rdm/efa_rdm_rxe_map.c

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
* pointer to an RX entry. If such RX entry does not exist, return NULL
1616
*/
1717
struct efa_rdm_ope *efa_rdm_rxe_map_lookup(struct efa_rdm_rxe_map *rxe_map,
18-
struct efa_rdm_pke *pkt_entry)
18+
uint64_t msg_id, fi_addr_t addr)
1919
{
2020
struct efa_rdm_rxe_map_entry *entry = NULL;
2121
struct efa_rdm_rxe_map_key key;
2222

2323
memset(&key, 0, sizeof(key));
24-
key.msg_id = efa_rdm_pke_get_rtm_msg_id(pkt_entry);
25-
key.addr = pkt_entry->addr;
24+
key.msg_id = msg_id;
25+
key.addr = addr;
2626
HASH_FIND(hh, rxe_map->head, &key, sizeof(struct efa_rdm_rxe_map_key), entry);
2727
return entry ? entry->rxe : NULL;
2828
}
@@ -39,22 +39,22 @@ struct efa_rdm_ope *efa_rdm_rxe_map_lookup(struct efa_rdm_rxe_map *rxe_map,
3939
* @param[in] rxe RX entry
4040
*/
4141
void efa_rdm_rxe_map_insert(struct efa_rdm_rxe_map *rxe_map,
42-
struct efa_rdm_pke *pkt_entry,
42+
uint64_t msg_id, fi_addr_t addr,
4343
struct efa_rdm_ope *rxe)
4444
{
4545
struct efa_rdm_rxe_map_entry *entry;
4646

47-
entry = ofi_buf_alloc(pkt_entry->ep->map_entry_pool);
47+
entry = ofi_buf_alloc(rxe->ep->map_entry_pool);
4848
if (OFI_UNLIKELY(!entry)) {
4949
EFA_WARN(FI_LOG_CQ,
5050
"Map entries for medium size message exhausted.\n");
51-
efa_base_ep_write_eq_error(&pkt_entry->ep->base_ep, FI_ENOBUFS, FI_EFA_ERR_RXE_POOL_EXHAUSTED);
51+
efa_base_ep_write_eq_error(&rxe->ep->base_ep, FI_ENOBUFS, FI_EFA_ERR_RXE_POOL_EXHAUSTED);
5252
return;
5353
}
5454

5555
memset(&entry->key, 0, sizeof(entry->key));
56-
entry->key.msg_id = efa_rdm_pke_get_rtm_msg_id(pkt_entry);
57-
entry->key.addr = pkt_entry->addr;
56+
entry->key.msg_id = msg_id;
57+
entry->key.addr = addr;
5858

5959
#if ENABLE_DEBUG
6060
{
@@ -67,6 +67,7 @@ void efa_rdm_rxe_map_insert(struct efa_rdm_rxe_map *rxe_map,
6767

6868
entry->rxe = rxe;
6969
HASH_ADD(hh, rxe_map->head, key, sizeof(struct efa_rdm_rxe_map_key), entry);
70+
rxe->rxe_map = rxe_map;
7071
}
7172

7273
/**
@@ -95,4 +96,6 @@ void efa_rdm_rxe_map_remove(struct efa_rdm_rxe_map *rxe_map, uint64_t msg_id,
9596
assert(entry && entry->rxe == rxe);
9697
HASH_DEL(rxe_map->head, entry);
9798
ofi_buf_free(entry);
99+
/* Now the rxe is removed from the map, reset it to NULL */
100+
rxe->rxe_map = NULL;
98101
}

prov/efa/src/rdm/efa_rdm_rxe_map.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ void efa_rdm_rxe_map_construct(struct efa_rdm_rxe_map *rxe_map)
4949
struct efa_rdm_pke;
5050

5151
struct efa_rdm_ope *efa_rdm_rxe_map_lookup(struct efa_rdm_rxe_map *rxe_map,
52-
struct efa_rdm_pke *pkt_entry);
52+
uint64_t msg_id, fi_addr_t addr);
5353

5454
void efa_rdm_rxe_map_insert(struct efa_rdm_rxe_map *rxe_map,
55-
struct efa_rdm_pke *pkt_entry,
55+
uint64_t msg_id, fi_addr_t addr,
5656
struct efa_rdm_ope *rxe);
5757

5858
void efa_rdm_rxe_map_remove(struct efa_rdm_rxe_map *rxe_map, uint64_t msg_id,

prov/efa/test/efa_unit_test_ope.c

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,3 +477,38 @@ void test_efa_rdm_rxe_handle_error_not_write_cq(struct efa_resource **state)
477477

478478
efa_rdm_rxe_release(rxe);
479479
}
480+
481+
void test_efa_rdm_rxe_map(struct efa_resource **state)
482+
{
483+
struct efa_resource *resource = *state;
484+
struct efa_rdm_ope *rxe;
485+
struct efa_rdm_ep *efa_rdm_ep;
486+
487+
efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME);
488+
489+
rxe = efa_unit_test_alloc_rxe(resource, ofi_op_tagged);
490+
rxe->msg_id = 1;
491+
assert_non_null(rxe);
492+
493+
/* rxe has not been inserted to any rxe_map yet */
494+
assert_null(rxe->rxe_map);
495+
496+
efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep,
497+
base_ep.util_ep.ep_fid);
498+
499+
efa_rdm_rxe_map_insert(&efa_rdm_ep->rxe_map, rxe->msg_id, rxe->addr,
500+
rxe);
501+
assert_true(rxe->rxe_map == &efa_rdm_ep->rxe_map);
502+
assert_true(rxe == efa_rdm_rxe_map_lookup(rxe->rxe_map, rxe->msg_id,
503+
rxe->addr));
504+
505+
efa_rdm_rxe_release(rxe);
506+
507+
/**
508+
* Now the map_entry_pool should be empty so we can destroy it
509+
* Otherwise there will be an assertion error on the use cnt is
510+
* is non-zero
511+
*/
512+
ofi_bufpool_destroy(efa_rdm_ep->map_entry_pool);
513+
efa_rdm_ep->map_entry_pool = NULL;
514+
}

prov/efa/test/efa_unit_tests.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ int main(void)
212212
cmocka_unit_test_setup_teardown(test_efa_rdm_txe_handle_error_not_write_cq, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
213213
cmocka_unit_test_setup_teardown(test_efa_rdm_rxe_handle_error_write_cq, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
214214
cmocka_unit_test_setup_teardown(test_efa_rdm_rxe_handle_error_not_write_cq, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
215+
cmocka_unit_test_setup_teardown(test_efa_rdm_rxe_map, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
215216
cmocka_unit_test_setup_teardown(test_efa_rdm_msg_send_to_local_peer_with_null_desc, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
216217
cmocka_unit_test_setup_teardown(test_efa_fork_support_request_initialize_when_ibv_fork_support_is_needed, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
217218
cmocka_unit_test_setup_teardown(test_efa_fork_support_request_initialize_when_ibv_fork_support_is_unneeded, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),

0 commit comments

Comments
 (0)