[Lustre-devel] Queries regarding LDLM_ENQUEUE

Vilobh Meshram vilobh.meshram at gmail.com
Thu Oct 21 19:33:44 PDT 2010


Thanks Andreas for the e-mail.

I am trying to modify the LDLM_ENQUEUE rpc to get the the reply in the form
of some buffer (say string "Hello World") filled in from MDS.I have
explained the use case in my last e-mail.Please refer my e-mail sent on
10/19.

I have attached the diff files. I am getting a kernel panic at the MDS end
when I try to make the attached changes.Can someone please suggest me where
I might be missing ?


Thanks,
Vilobh
*Graduate Research Associate
Department of Computer Science
The Ohio State University Columbus Ohio*


On Wed, Oct 20, 2010 at 3:55 AM, Andreas Dilger
<andreas.dilger at oracle.com>wrote:

> On 2010-10-19, at 20:04, Vilobh Meshram wrote:
> > We are trying to do following things.Please let me know if things are not
> clear :-
> >
> > Say we have 2 client C1 and C2 and a MDS .Say C1 and C2 share a file.
> > 1) When a client C1 performs a open/create kind of request to the MDS we
> want to follow the normal path which Lustre performs.
> > 2) Now say C2 tries to open the same file which was opened by C1.
> > 3) At the MDS end we maintain some data structure to scan and see if the
> file was already opened by some Client(in this case C1 has opened this
> file).
> > 4) If MDS finds that some client(C1 here) has already opened the file
> then it send the new client(C2 here) with some information about the client
> which has initially opened the file.
>
> While I understand the basic concept, I don't really see how your proposal
> will actually improve performance.  If C2 already has to contact the MDS and
> get a reply from it, then wouldn't it be about the same to simply perform
> the open as is done today?  The number of MDS RPCs is the same, and in fact
> this would avoid further message overhead between C1 and C2.
>
> > 5) Once C2 gets the information its upto C2 to take further actions.
> > 6) By this process we can save the time spent in the locking mechanism
> for C2.Basically we aim to by-pass the locking scheme of Lustre for the
> files already opened by some client by maintaining some kind of data
> structure.
> >
> > Please let us know your thoughts on the above approach.Is this a feasible
> design moving ahead can we see any complications ?
>
> There is a separate proposal that has been underway in the Linux community
> for some time, to allow a user process to get a file handle (i.e. binary
> blob returned from a new name_to_handle() syscall) from the kernel for a
> given pathname, and then later use that file handle in another process to
> open a file descriptor without re-traversing the path.
>
> I've been thinking this would be very useful for Lustre (and MPI in
> general), and have tried to steer the Linux development in a direction that
> would allow this to happen.  Is this in line with what you are
> investigating?
>
> While this wouldn't eliminate the actual MDS open RPC (i.e. the
> LDLM_ENQUEUE you have been discussing), it could avoid the path traversal
> from each client, possibly saving {path_elements * num_clients} additional
> RPCs,
>
> > So considering the problem statement I need a way for C2 to extract the
> information from the data structure maintained at MDS.In order to do that ,
> C2 will send a request with intent = create|open which will be a
> LDLM_ENQUEUE RPC.I need to modify this RPC such that :-
> > 1) I can enclose some additional buffer whose size is known to me .
> > 2) When we pack the reply at the MDS side we should be able to include
> this buffer in the reply message .
> > 3) At the client side we should be able to extract the information from
> the reply message about the buffer.
> >
> > As of now , I need help in above three steps.
> >
> > Thanks,
> > Vilobh
> > Graduate Research Associate
> > Department of Computer Science
> > The Ohio State University Columbus Ohio
> >
> >
> > On Tue, Oct 19, 2010 at 6:53 PM, Andreas Dilger <
> andreas.dilger at oracle.com> wrote:
> > On 2010-10-19, at 14:28, Vilobh Meshram wrote:
> > > From my exploration it seems like for create/open kind of request
> LDLM_ENQUEUE is the RPC through which the client talks to MDS.Please confirm
> on this.
> > >
> > > Since I could figure out that LDLM_ENQUEUE is the only RPC to interface
> with MDS I am planning to send the LDLM_ENQUEUE RPC with some additonal
> buffer from the client to the MDS so that based on some specific condition
> the MDS can fill the information in the buffer sent from the client.
> >
> > This isn't correct.  LDLM_ENQUEUE is used for enqueueing locks.  It just
> happens that when Lustre wants to create a new file it enqueues a lock on
> the parent directory with the "intent" to create a new file.  The MDS
> currently always replies "you cannot have the lock for the directory, I
> created the requested file for you".  Similarly, when the client is getting
> attributes on a file, it needs a lock on that file in order to cache the
> attributes, and to save RPCs the attributes are returned with the lock.
> >
> > > I have made some modifications to the code for the LDLM_ENQUEUE RPC but
> I am getting kernel panics.Can someone please help me and suggest me what is
> a good way to tackle this problem.I am using Lustre 1.8.1.1 and I cannot
> upgrade to Lustre 2.0.
> >
> > It would REALLY be a lot easier to have this discussion with you if you
> actually told us what it is you are working on.  Not only could we focus on
> the higher-level issue that you are trying to solve (instead of possibly
> wasting a lot of time focussing in a small issue that may in fact be
> completely irrelevant), but with many ideas related to Lustre it has
> probably already been discussed at length by the Lustre developers sometime
> over the past 8 years that we've been working on it.  I suspect that the
> readership of this list could probably give you a lot of assistance with
> whatever you are working on, if you will only tell us what it actually is
> you are trying to do.
> >
> > > On Mon, Oct 18, 2010 at 7:33 PM, Vilobh Meshram <
> vilobh.meshram at gmail.com> wrote:
> > >> Out of the many RPC's used in Lustre seems like LDLM_ENQUEUE is the
> most frequently used RPC to communicate between the client and the MDS.I
> have few queries regarding the same :-
> > >>
> > >> 1) Is LDLM_ENQUEUE the only interface(RPC here) for CREATE/OPEN kind
> of request ; through which the client can interact with the MDS ?
> > >>
> > >> I tried couple of experiments and found out that LDLM_ENQUEUE comes
> into picture while mounting the FS as well as when we do a lookup,create or
> open a file.I was expecting the MDS_REINT RPC to get invoked in case of a
> CREATE/OPEN request via mdc_create() but it seems like Lustre invokes
> LDLM_ENQEUE even for CREATE/OPEN( by packing the intent related data).
> > >> Please correct me if I am wrong.
> > >>
> > >> 2) In which cases (which system calls) does the MDS_REINT RPC will get
> invoked ?
> >
> >
> > Cheers, Andreas
> > --
> > Andreas Dilger
> > Lustre Technical Lead
> > Oracle Corporation Canada Inc.
> >
> >
>
>
> Cheers, Andreas
> --
> Andreas Dilger
> Lustre Technical Lead
> Oracle Corporation Canada Inc.
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.lustre.org/pipermail/lustre-devel-lustre.org/attachments/20101021/495a5f4a/attachment.htm>
-------------- next part --------------
*** ./lustre/ldlm/ldlm_lockd.c	2010-10-21 17:49:05.000000000 -0400
--- ../fresh/lustre/ldlm/ldlm_lockd.c	2010-10-15 15:37:02.000000000 -0400
*************** int ldlm_handle_enqueue(struct ptlrpc_re
*** 997,1017 ****
          struct obd_device *obddev = req->rq_export->exp_obd;
          struct ldlm_reply *dlm_rep;
          struct ldlm_request *dlm_req;
!         __u32 size[4] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                          [DLM_LOCKREPLY_OFF]   = sizeof(*dlm_rep) };
          int rc = 0;
          __u32 flags;
          ldlm_error_t err = ELDLM_OK;
          struct ldlm_lock *lock = NULL;
          void *cookie = NULL;
-         int i;
-         char *str = "Hello World Sun";
-         char *str_target;
          ENTRY;
  
          LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
!         printk("\n Inside function %s server-side enqueue handler START",__func__);
!         for(i=0;i<3;i++) printk("\n Inside function %s size[%d]:%d",__func__,i,size[i]);
          dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
                                       lustre_swab_ldlm_request);
          if (dlm_req == NULL) {
--- 997,1013 ----
          struct obd_device *obddev = req->rq_export->exp_obd;
          struct ldlm_reply *dlm_rep;
          struct ldlm_request *dlm_req;
!         __u32 size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                          [DLM_LOCKREPLY_OFF]   = sizeof(*dlm_rep) };
          int rc = 0;
          __u32 flags;
          ldlm_error_t err = ELDLM_OK;
          struct ldlm_lock *lock = NULL;
          void *cookie = NULL;
          ENTRY;
  
          LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
! 
          dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),
                                       lustre_swab_ldlm_request);
          if (dlm_req == NULL) {
*************** existing_lock:
*** 1126,1148 ****
                  int buffers = 2;
  
                  lock_res_and_lock(lock);
-                 printk("\n Exsisting lock lock->l_resource->lr_lvb_len:%u",lock->l_resource->lr_lvb_len);
                  if (lock->l_resource->lr_lvb_len) {
-                         printk("\n Inside function %s , inside condition lock->l_resource->lr_lvb_len so buffers=3",__func__);
                          size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
                          buffers = 3;
                  }
-                 //size[DLM_REPLY_REC_OFF] = 16;
-                 //buffer = buffer + 1;
-                 if(lock->l_resource->lr_lvb_len == 0)
-                 {
-                      buffers++;
-                      size[DLM_REPLY_REC_OFF] = 0;
-                 }
-                 buffers++;
-                 size[DLM_REPLY_REC_OFF+1] = 16;
                  unlock_res_and_lock(lock);
!                 printk("\n Inside function %s , outside condition lock->l_resource->lr_lvb_len so buffers=2",__func__);
                  if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
                          GOTO(out, rc = -ENOMEM);
  
--- 1122,1133 ----
                  int buffers = 2;
  
                  lock_res_and_lock(lock);
                  if (lock->l_resource->lr_lvb_len) {
                          size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
                          buffers = 3;
                  }
                  unlock_res_and_lock(lock);
! 
                  if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
                          GOTO(out, rc = -ENOMEM);
  
*************** existing_lock:
*** 1156,1164 ****
          if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
                  lock->l_req_extent = lock->l_policy_data.l_extent;
  
-         printk("%s: \twill do lock-enq...\n", __func__);
          err = ldlm_lock_enqueue(obddev->obd_namespace, &lock, cookie, (int *)&flags);
-         printk("%s: \tafter lock-enq...\n", __func__);
          if (err)
                  GOTO(out, err);
  
--- 1141,1147 ----
*************** existing_lock:
*** 1178,1185 ****
          dlm_rep->lock_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
          lock->l_flags |= dlm_req->lock_flags & LDLM_INHERIT_FLAGS;
  
-         str_target = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF+1,16);
-         memcpy(str_target,str,16);
          /* Don't move a pending lock onto the export if it has already
           * been evicted.  Cancel it now instead. (bug 5683) */
          if (req->rq_export->exp_failed ||
--- 1161,1166 ----
*************** existing_lock:
*** 1232,1238 ****
  
          EXIT;
   out:
-         printk("\n [VM] Inside function %s got a hit at out",__func__);
          req->rq_status = rc ?: err;  /* return either error - bug 11190 */
          if (!req->rq_packed_final) {
                  err = lustre_pack_reply(req, 1, NULL, NULL);
--- 1213,1218 ----
*************** existing_lock:
*** 1248,1257 ****
  
                  if (rc == 0) {
                          lock_res_and_lock(lock);
-                         printk("\n Inside function %s , inside if condition rc=0 the place where we do a memcpy for offset = DLM_REPLY_REC_OFF",__func__);
                          size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;
-                         printk("\n Inside function %s , size[DLM_REPLY_REC_OFF] : %u , lock->l_resource->lr_lvb_len :%u",__func__,size[DLM_REPLY_REC_OFF],lock->l_resource->lr_lvb_len);
-                         size[DLM_REPLY_REC_OFF+1]= 16;
                          if (size[DLM_REPLY_REC_OFF] > 0) {
                                  void *lvb = lustre_msg_buf(req->rq_repmsg,
                                                         DLM_REPLY_REC_OFF,
--- 1228,1234 ----
*************** existing_lock:
*** 1264,1270 ****
                          }
                          unlock_res_and_lock(lock);
                  } else {
-                         printk("\n Inside function %s , inside else condition rc=0 the place where we do a memcpy for offset = DLM_REPLY_REC_OFF",__func__);
                          lock_res_and_lock(lock);
                          ldlm_resource_unlink_lock(lock);
                          ldlm_lock_destroy_nolock(lock);
--- 1241,1246 ----
-------------- next part --------------
*** ./lustre/ldlm/ldlm_request.c	2010-10-21 22:26:28.000000000 -0400
--- ../fresh/lustre/ldlm/ldlm_request.c	2010-10-15 15:37:02.000000000 -0400
*************** int ldlm_cli_enqueue_fini(struct obd_exp
*** 389,395 ****
          int cleanup_phase = 1;
          ENTRY;
  
-         printk("\n Inside function %s",__func__);
          lock = ldlm_handle2lock(lockh);
          /* ldlm_cli_enqueue is holding a reference on this lock. */
          if (!lock) {
--- 389,394 ----
*************** int ldlm_cli_enqueue_fini(struct obd_exp
*** 401,407 ****
                  LASSERT(!is_replay);
                  LDLM_DEBUG(lock, "client-side enqueue END (%s)",
                             rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
-                 printk("\n Inside %s if client lock aborted or failed",__func__);
                  if (rc == ELDLM_LOCK_ABORTED) {
                          /* Before we return, swab the reply */
                          reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF,
--- 400,405 ----
*************** int ldlm_cli_enqueue_fini(struct obd_exp
*** 433,440 ****
                  GOTO(cleanup, rc = -EPROTO);
          }
  
-         printk("\n Inside function %s we have received a reply",__func__);
- 
          /* lock enqueued on the server */
          cleanup_phase = 0;
  
--- 431,436 ----
*************** int ldlm_cli_enqueue_fini(struct obd_exp
*** 463,469 ****
           * again. */
          if ((*flags) & LDLM_FL_LOCK_CHANGED) {
                  int newmode = reply->lock_desc.l_req_mode;
-                 printk("\n Inside function %s in condition (*flags) & LDLM_FL_LOCK_CHANGED)",__func__);
                  LASSERT(!is_replay);
                  if (newmode && newmode != lock->l_req_mode) {
                          LDLM_DEBUG(lock, "server returned different mode %s",
--- 459,464 ----
*************** int ldlm_cli_enqueue_fini(struct obd_exp
*** 504,510 ****
               * because it cannot handle asynchronous ASTs robustly (see
               * bug 7311). */
              (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {
-                 printk("\n Inside function %s in condition ((*flags) & LDLM_FL_AST_SENT ||(LIBLUSTRE_CLIENT && type == LDLM_EXTENT))",__func__);
                  lock_res_and_lock(lock);
                  lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
                  unlock_res_and_lock(lock);
--- 499,504 ----
*************** int ldlm_cli_enqueue_fini(struct obd_exp
*** 515,521 ****
           * clobber the LVB with an older one. */
          if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {
                  void *tmplvb;
-                 printk("\n Inside function %s in condition lvb_len && (lock->l_req_mode != lock->l_granted_mode) , lvb_len:%d",__func__,lvb_len);
                  tmplvb = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, lvb_len,
                                              lvb_swabber);
                  if (tmplvb == NULL)
--- 509,514 ----
*************** int ldlm_cli_enqueue_fini(struct obd_exp
*** 524,530 ****
          }
  
          if (!is_replay) {
-                 printk("\n Inside function %s in condition !is_replay",__func__);
                  rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);
                  if (lock->l_completion_ast != NULL) {
                          int err = lock->l_completion_ast(lock, *flags, NULL);
--- 517,522 ----
*************** int ldlm_cli_enqueue_fini(struct obd_exp
*** 536,542 ****
          }
  
          if (lvb_len && lvb != NULL) {
-                 printk("\n Inside function %s in condition lvb_len && lvb != NULL",__func__);
                  /* Copy the LVB here, and not earlier, because the completion
                   * AST (if any) can override what we got in the reply */
                  memcpy(lvb, lock->l_lvb_data, lvb_len);
--- 528,533 ----
*************** static inline int ldlm_req_handles_avail
*** 560,578 ****
                                           __u32 *size, int bufcount, int off)
  {
          int avail = min_t(int, LDLM_MAXREQSIZE, CFS_PAGE_SIZE - 512);
!         printk("\n Inside function %s",__func__);
!         printk("\n avail--before = %d",avail); 
          avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
                                   bufcount, size);
!         printk("\n avail--after  = %d",avail);
!         if (likely(avail >= 0)){
                  avail /= (int)sizeof(struct lustre_handle);
-                 printk("\n avail--likely = %d",avail);
-         }
          else
                  avail = 0;
          avail += LDLM_LOCKREQ_HANDLES - off;
!         printk("\n avail--lats = %d",avail);
          return avail;
  }
  
--- 551,565 ----
                                           __u32 *size, int bufcount, int off)
  {
          int avail = min_t(int, LDLM_MAXREQSIZE, CFS_PAGE_SIZE - 512);
! 
          avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
                                   bufcount, size);
!         if (likely(avail >= 0))
                  avail /= (int)sizeof(struct lustre_handle);
          else
                  avail = 0;
          avail += LDLM_LOCKREQ_HANDLES - off;
! 
          return avail;
  }
  
*************** struct ptlrpc_request *ldlm_prep_elc_req
*** 597,622 ****
          CFS_LIST_HEAD(head);
          ENTRY;
  
-         printk("\n Inside function %s, opc=%d",__func__, opc);
          if (cancels == NULL)
                  cancels = &head;
          if (exp_connect_cancelset(exp)) {
                  /* Estimate the amount of free space in the request. */
-                 printk("\n Inside exp_connect_cancelset(exp) in func %s",__func__);
                  LASSERT(bufoff < bufcount);
  
                  avail = ldlm_req_handles_avail(exp, size, bufcount, canceloff);
-                 printk("\n In function %s avail = %d",__func__,avail);
                  flags = ns_connect_lru_resize(ns) ?
                          LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
-                 printk("\n In function %s ns_connect_lru_resize(ns) :%d",__func__,ns_connect_lru_resize(ns));
                  to_free = !ns_connect_lru_resize(ns) &&
                            opc == LDLM_ENQUEUE ? 1 : 0;
  
                  /* Cancel lru locks here _only_ if the server supports
                   * EARLY_CANCEL. Otherwise we have to send extra CANCEL
                   * rpc, what will make us slower. */
-                 printk("\n In function %s count = %d",__func__,count);
                  if (avail > count)
                          count += ldlm_cancel_lru_local(ns, cancels, to_free,
                                                         avail - count, 0, flags);
--- 584,604 ----
*************** struct ptlrpc_request *ldlm_prep_elc_req
*** 624,632 ****
                          pack = count;
                  else
                          pack = avail;
-                 printk("\n In function %s pack = %d",__func__,pack);
                  size[bufoff] = ldlm_request_bufsize(pack, opc);
-                 printk("\n In function %s , bufoff : %d , size[bufoff]= %u",__func__,bufoff,size[bufoff]);
          }
  
          req = ptlrpc_prep_req(class_exp2cliimp(exp), version,
--- 606,612 ----
*************** struct ptlrpc_request *ldlm_prep_enqueue
*** 657,663 ****
                                               struct list_head *cancels,
                                               int count)
  {
-         printk("\n Inside function %s \n",__func__);
          return ldlm_prep_elc_req(exp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
                                   bufcount, size, DLM_LOCKREQ_OFF,
                                   LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
--- 637,642 ----
*************** int ldlm_cli_enqueue(struct obd_export *
*** 679,697 ****
          struct ldlm_lock *lock;
          struct ldlm_request *body;
          struct ldlm_reply *reply;
!         __u32 size[4] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                          [DLM_LOCKREQ_OFF]     = sizeof(*body),
                          [DLM_REPLY_REC_OFF]   = lvb_len ? lvb_len :
!                                                 sizeof(struct ost_lvb),
!                         [DLM_REPLY_REC_OFF+1] = 16};
          int is_replay = *flags & LDLM_FL_REPLAY;
          int req_passed_in = 1, rc, err;
          struct ptlrpc_request *req;
-         int i;
          ENTRY;
  
-         printk("\n Inside function %s \n",__func__);
-         for(i=0;i<4;i++) printk("\n size[%d] : %d",i,size[i]);
          LASSERT(exp != NULL);
  
          /* If we're replaying this lock, just check some invariants.
--- 658,672 ----
          struct ldlm_lock *lock;
          struct ldlm_request *body;
          struct ldlm_reply *reply;
!         __u32 size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                          [DLM_LOCKREQ_OFF]     = sizeof(*body),
                          [DLM_REPLY_REC_OFF]   = lvb_len ? lvb_len :
!                                                 sizeof(struct ost_lvb) };
          int is_replay = *flags & LDLM_FL_REPLAY;
          int req_passed_in = 1, rc, err;
          struct ptlrpc_request *req;
          ENTRY;
  
          LASSERT(exp != NULL);
  
          /* If we're replaying this lock, just check some invariants.
*************** int ldlm_cli_enqueue(struct obd_export *
*** 700,706 ****
                  lock = ldlm_handle2lock(lockh);
                  LASSERT(lock != NULL);
                  LDLM_DEBUG(lock, "client-side enqueue START");
-                 printk("\n Client-side enqueue START in %s",__func__);
                  LASSERT(exp == lock->l_conn_export);
          } else {
                  lock = ldlm_lock_create(ns, res_id, einfo->ei_type,
--- 675,680 ----
*************** int ldlm_cli_enqueue(struct obd_export *
*** 736,742 ****
          /* lock not sent to server yet */
  
          if (reqp == NULL || *reqp == NULL) {
!                 req = ldlm_prep_enqueue_req(exp,3, size, NULL, 0);
                  if (req == NULL) {
                          failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
                          LDLM_LOCK_PUT(lock);
--- 710,716 ----
          /* lock not sent to server yet */
  
          if (reqp == NULL || *reqp == NULL) {
!                 req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
                  if (req == NULL) {
                          failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);
                          LDLM_LOCK_PUT(lock);
*************** int ldlm_cli_enqueue(struct obd_export *
*** 746,752 ****
                  if (reqp)
                          *reqp = req;
          } else {
-                 printk("\n [VM]got a hit at case where reqp is not NULL in %s",__func__);
                  req = *reqp;
                  LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >=
                           sizeof(*body), "buflen[%d] = %d, not %d\n",
--- 720,725 ----
*************** int ldlm_cli_enqueue(struct obd_export *
*** 768,774 ****
          /* Continue as normal. */
          if (!req_passed_in) {
                  size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
!                 ptlrpc_req_set_repsize(req, 4, size);
          }
  
          /*
--- 741,747 ----
          /* Continue as normal. */
          if (!req_passed_in) {
                  size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
!                 ptlrpc_req_set_repsize(req, 3, size);
          }
  
          /*
*************** int ldlm_cli_enqueue(struct obd_export *
*** 784,793 ****
                  RETURN(0);
          }
  
-         printk("\n in --func-- %s SENDING REQUEST",__func__);
          LDLM_DEBUG(lock, "sending request");
          rc = ptlrpc_queue_wait(req);
-         printk("\n in --func-- %s REQUEST SENT after ptlrpc_queue_wait",__func__);
          err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
                                      einfo->ei_mode, flags, lvb, lvb_len,
                                      lvb_swabber, lockh, rc);
--- 757,764 ----
-------------- next part --------------
*** ./lustre/mdc/mdc_locks.c	2010-10-20 20:58:51.000000000 -0400
--- ../fresh/lustre/mdc/mdc_locks.c	2010-10-15 15:37:15.000000000 -0400
*************** static struct ptlrpc_request *mdc_intent
*** 252,264 ****
          int repbufcount = 5;
          int mode;
          int rc;
-         int i;
          ENTRY;
  
-         printk("\n Inside function %s",__func__);
-         for(i=0;i<6;i++) printk("\n size[%d] : %d",i,size[i]);
-         for(i=0;i<5;i++) printk("\n repsize[%d] : %d",i,repsize[i]);
- 
          it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
          if (mdc_exp_is_2_0_server(exp)) {
                  size[DLM_INTENT_REC_OFF] = sizeof(struct mdt_rec_create);
--- 252,259 ----
*************** static struct ptlrpc_request *mdc_intent
*** 381,405 ****
          struct ptlrpc_request *req;
          struct ldlm_intent *lit;
          struct obd_device *obddev = class_exp2obd(exp);
!         __u32 size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                          [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
                          [DLM_INTENT_IT_OFF]   = sizeof(*lit),
                          [DLM_INTENT_REC_OFF]  = sizeof(struct mdt_body),
                          [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
!                         [DLM_INTENT_REC_OFF+2]= 0,
!                         [DLM_INTENT_REC_OFF+3]= 16 };
!         __u32 repsize[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                             [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
                             [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
                             [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
                                                          cl_max_mds_easize,
                             [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
!                            [DLM_REPLY_REC_OFF+3] = 0,
!                            [DLM_REPLY_REC_OFF+4] = 16 };
          obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
                            OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
!         int bufcount = 6;
!         int i=0;
          ENTRY;
  
          if (mdc_exp_is_2_0_server(exp)) {
--- 376,397 ----
          struct ptlrpc_request *req;
          struct ldlm_intent *lit;
          struct obd_device *obddev = class_exp2obd(exp);
!         __u32 size[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                          [DLM_LOCKREQ_OFF]     = sizeof(struct ldlm_request),
                          [DLM_INTENT_IT_OFF]   = sizeof(*lit),
                          [DLM_INTENT_REC_OFF]  = sizeof(struct mdt_body),
                          [DLM_INTENT_REC_OFF+1]= data->namelen + 1,
!                         [DLM_INTENT_REC_OFF+2]= 0 };
!         __u32 repsize[6] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                             [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
                             [DLM_REPLY_REC_OFF]   = sizeof(struct mdt_body),
                             [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
                                                          cl_max_mds_easize,
                             [DLM_REPLY_REC_OFF+2] = LUSTRE_POSIX_ACL_MAX_SIZE,
!                            [DLM_REPLY_REC_OFF+3] = 0 };
          obd_valid valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE | OBD_MD_FLACL |
                            OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA;
!         int bufcount = 5;
          ENTRY;
  
          if (mdc_exp_is_2_0_server(exp)) {
*************** static struct ptlrpc_request *mdc_intent
*** 407,418 ****
                  size[DLM_INTENT_REC_OFF+2] = data->namelen + 1;
                  bufcount = 6;
          }
-        
-         printk("%s: prep-enq-req: bufcnt=%d\n", __func__, bufcount); 
-         for(i=0; i<bufcount; i++) {      
-                 printk("\tsize[%d]=%u\n", i,size[i] );
-                 printk("\trepsize[%d]=%u\n", i,repsize[i] );
-         }
          req = ldlm_prep_enqueue_req(exp, bufcount, size, NULL, 0);
          if (req) {
                  /* pack the intent */
--- 399,404 ----
*************** static int mdc_finish_enqueue(struct obd
*** 455,461 ****
          struct ldlm_reply *lockrep;
          ENTRY;
  
-         printk("\n Inside function %s",__func__);
          LASSERT(rc >= 0);
          /* Similarly, if we're going to replay this request, we don't want to
           * actually get a lock, just perform the intent. */
--- 441,446 ----
*************** static int mdc_finish_enqueue(struct obd
*** 517,523 ****
          /* We know what to expect, so we do any byte flipping required here */
          if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
                  struct mds_body *body;
!                 printk("\n Inside function %s inside condition IT_OPEN , IT_LOOKUP , IT_GETATTR",__func__);
                  body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
                                           lustre_swab_mds_body);
                  if (body == NULL) {
--- 502,508 ----
          /* We know what to expect, so we do any byte flipping required here */
          if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
                  struct mds_body *body;
! 
                  body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body),
                                           lustre_swab_mds_body);
                  if (body == NULL) {
*************** int mdc_enqueue(struct obd_export *exp, 
*** 587,593 ****
          int rc;
          ENTRY;
  
-         printk("\n Inside function %s \n",__func__);
          fid_build_reg_res_name((void *)&data->fid1, &res_id);
          LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
          if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR))
--- 572,577 ----
*************** int mdc_intent_getattr_async(struct obd_
*** 924,933 ****
          int                      flags = LDLM_FL_HAS_INTENT;
          ENTRY;
  
-         printk("%s: name: %.*s in inode "LPU64", intent: %s flags %#o\n",__func__,
-                op_data->namelen, op_data->name, op_data->fid1.id,
-                ldlm_it2str(it->it_op), it->it_flags);
- 
          CDEBUG(D_DLMTRACE,"name: %.*s in inode "LPU64", intent: %s flags %#o\n",
                 op_data->namelen, op_data->name, op_data->fid1.id,
                 ldlm_it2str(it->it_op), it->it_flags);
--- 908,913 ----


More information about the lustre-devel mailing list