summaryrefslogtreecommitdiff
path: root/fs/dlm/rcom.c
diff options
context:
space:
mode:
authorAlexander Aring <aahringo@redhat.com>2021-05-21 15:08:41 -0400
committerDavid Teigland <teigland@redhat.com>2021-05-25 09:22:20 -0500
commita070a91cf1402b5328d3517d1fccbdeec58d3f2d (patch)
tree3cc4204a0693629e2074cf344280774863b4a21e /fs/dlm/rcom.c
parent6fb5cf9d4206f2cdccb05be1bf2307dab4e5babe (diff)
fs: dlm: add more midcomms hooks
This patch prepares hooks to redirect to the midcomms layer which will be used by the midcomms re-transmit handling. There exists the new concept of stateless buffers allocation and commits. This can be used to bypass the midcomms re-transmit handling. It is used by RCOM_STATUS and RCOM_NAMES messages, because they have their own ping-like re-transmit handling. As well these two messages will be used to determine the DLM version per node, because these two messages are per observation the first messages which are exchanged. Cluster manager events for node membership are added to add support for half-closed connections in cases that the peer connection get to an end of file but DLM still holds membership of the node. In this time DLM can still trigger new message which we should allow. After the cluster manager node removal event occurs it safe to close the connection. Signed-off-by: Alexander Aring <aahringo@redhat.com> Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm/rcom.c')
-rw-r--r--fs/dlm/rcom.c101
1 files changed, 71 insertions, 30 deletions
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index f5b1bd65728d..2661674364af 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -27,20 +27,10 @@ static int rcom_response(struct dlm_ls *ls)
return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
}
-static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
- struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
+static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
+ struct dlm_rcom **rc_ret, char *mb, int mb_len)
{
struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
- char *mb;
- int mb_len = sizeof(struct dlm_rcom) + len;
-
- mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
- if (!mh) {
- log_print("create_rcom to %d type %d len %d ENOBUFS",
- to_nodeid, type, len);
- return -ENOBUFS;
- }
rc = (struct dlm_rcom *) mb;
@@ -56,15 +46,64 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
rc->rc_seq = ls->ls_recover_seq;
spin_unlock(&ls->ls_recover_lock);
- *mh_ret = mh;
*rc_ret = rc;
+}
+
+static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
+ struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
+{
+ int mb_len = sizeof(struct dlm_rcom) + len;
+ struct dlm_mhandle *mh;
+ char *mb;
+
+ mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
+ if (!mh) {
+ log_print("%s to %d type %d len %d ENOBUFS",
+ __func__, to_nodeid, type, len);
+ return -ENOBUFS;
+ }
+
+ _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
+ *mh_ret = mh;
return 0;
}
+static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
+ int len, struct dlm_rcom **rc_ret,
+ void **mh_ret)
+{
+ int mb_len = sizeof(struct dlm_rcom) + len;
+ void *mh;
+ char *mb;
+
+ mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
+ if (!mh) {
+ log_print("create_rcom to %d type %d len %d ENOBUFS",
+ to_nodeid, type, len);
+ return -ENOBUFS;
+ }
+
+ _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
+ *mh_ret = mh;
+ return 0;
+}
+
+static void _send_rcom(struct dlm_ls *ls, struct dlm_rcom *rc)
+{
+ dlm_rcom_out(rc);
+}
+
static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
struct dlm_rcom *rc)
{
- dlm_rcom_out(rc);
+ _send_rcom(ls, rc);
+ dlm_midcomms_commit_mhandle(mh);
+}
+
+static void send_rcom_stateless(struct dlm_ls *ls, void *mh,
+ struct dlm_rcom *rc)
+{
+ _send_rcom(ls, rc);
dlm_lowcomms_commit_buffer(mh);
}
@@ -141,8 +180,8 @@ static void disallow_sync_reply(struct dlm_ls *ls)
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
{
struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
int error = 0;
+ void *mh;
ls->ls_recover_nodeid = nodeid;
@@ -153,8 +192,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
}
retry:
- error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
- sizeof(struct rcom_status), &rc, &mh);
+ error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
+ sizeof(struct rcom_status), &rc, &mh);
if (error)
goto out;
@@ -163,7 +202,7 @@ retry:
allow_sync_reply(ls, &rc->rc_id);
memset(ls->ls_recover_buf, 0, LOWCOMMS_MAX_TX_BUFFER_LEN);
- send_rcom(ls, mh, rc);
+ send_rcom_stateless(ls, mh, rc);
error = dlm_wait_function(ls, &rcom_response);
disallow_sync_reply(ls);
@@ -191,13 +230,13 @@ retry:
static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{
struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
struct rcom_status *rs;
uint32_t status;
int nodeid = rc_in->rc_header.h_nodeid;
int len = sizeof(struct rcom_config);
int num_slots = 0;
int error;
+ void *mh;
if (!dlm_slots_version(&rc_in->rc_header)) {
status = dlm_recover_status(ls);
@@ -218,8 +257,8 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
len += num_slots * sizeof(struct rcom_slot);
do_create:
- error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
- len, &rc, &mh);
+ error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
+ len, &rc, &mh);
if (error)
return;
@@ -246,7 +285,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
spin_unlock(&ls->ls_recover_lock);
do_send:
- send_rcom(ls, mh, rc);
+ send_rcom_stateless(ls, mh, rc);
}
static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
@@ -271,13 +310,14 @@ static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
{
struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
int error = 0;
+ void *mh;
ls->ls_recover_nodeid = nodeid;
retry:
- error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
+ error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len,
+ &rc, &mh);
if (error)
goto out;
memcpy(rc->rc_buf, last_name, last_len);
@@ -285,7 +325,7 @@ retry:
allow_sync_reply(ls, &rc->rc_id);
memset(ls->ls_recover_buf, 0, LOWCOMMS_MAX_TX_BUFFER_LEN);
- send_rcom(ls, mh, rc);
+ send_rcom_stateless(ls, mh, rc);
error = dlm_wait_function(ls, &rcom_response);
disallow_sync_reply(ls);
@@ -298,14 +338,15 @@ retry:
static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{
struct dlm_rcom *rc;
- struct dlm_mhandle *mh;
int error, inlen, outlen, nodeid;
+ void *mh;
nodeid = rc_in->rc_header.h_nodeid;
inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
outlen = LOWCOMMS_MAX_TX_BUFFER_LEN - sizeof(struct dlm_rcom);
- error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
+ error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
+ &rc, &mh);
if (error)
return;
rc->rc_id = rc_in->rc_id;
@@ -313,7 +354,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
nodeid);
- send_rcom(ls, mh, rc);
+ send_rcom_stateless(ls, mh, rc);
}
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
@@ -458,7 +499,7 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
char *mb;
int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
- mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
+ mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb);
if (!mh)
return -ENOBUFS;
@@ -479,7 +520,7 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
rf->rf_lvblen = cpu_to_le32(~0U);
dlm_rcom_out(rc);
- dlm_lowcomms_commit_buffer(mh);
+ dlm_midcomms_commit_mhandle(mh);
return 0;
}