Skip site navigation (1)Skip section navigation (2)
Date:      Thu, 15 Aug 2002 06:29:57 -0700
From:      Terry Lambert <tlambert2@mindspring.com>
To:        hackers@freebsd.org, jlemon@freebsd.org
Subject:   PATCHES: Support for kqueue for System V message queues
Message-ID:  <3D5BACD5.405A71DF@mindspring.com>

next in thread | raw e-mail | index | archive | help
This is a multi-part message in MIME format.
--------------AB3423182009884E14B98646
Content-Type: text/plain; charset=us-ascii
Content-Transfer-Encoding: 7bit

I got really tired of this support not eng there, so I have added
it, with these patches (attached -- for weenies, I have included
a unidiff, at the end, but you should really use the context diff
when you port the code to -current... ;-)).

The "data" part returns the number of messages pending on the
message queue being filtered on (sorry, but returning the message
type is leftas an exercise for the student, since the note value
is muxed inwth the hint, and there's not a third parameter to KNOTE(),
knote(), or f_event()).

I have included an example receiver, and an example sender.

The most interesting test is to send a couple of messages,
and then start the receiver, and send another one.

Note that I do not trigger an event for messages already in
the queue, as this would have required substantial changes to
the number of members in the f_ops vector, and refactoring of
much of the code in kern_event.c to take the new entry vectors
into account.  Basically, it acts like signals (EV_CLEAR is
set automatically on f_attach(), but can be explicitly reset
with a second call).

-- Terry
--------------AB3423182009884E14B98646
Content-Type: text/plain; charset=us-ascii;
 name="kqueue.diff"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
 filename="kqueue.diff"

Index: kern/kern_event.c
===================================================================
RCS file: /usr/cvs/src/sys/kern/kern_event.c,v
retrieving revision 1.2.2.8
diff -c -r1.2.2.8 kern_event.c
*** kern/kern_event.c	14 Dec 2001 19:24:42 -0000	1.2.2.8
--- kern/kern_event.c	15 Aug 2002 11:33:50 -0000
***************
*** 122,127 ****
--- 122,128 ----
  
  extern struct filterops aio_filtops;
  extern struct filterops sig_filtops;
+ extern struct filterops sysvmsg_filtops;
  
  /*
   * Table for for all system-defined filters.
***************
*** 134,139 ****
--- 135,141 ----
  	&proc_filtops,			/* EVFILT_PROC */
  	&sig_filtops,			/* EVFILT_SIGNAL */
  	&timer_filtops,			/* EVFILT_TIMER */
+ 	&sysvmsg_filtops,		/* EVFILT_SYSVMSG */
  };
  
  static int
Index: kern/sysv_msg.c
===================================================================
RCS file: /usr/cvs/src/sys/kern/sysv_msg.c,v
retrieving revision 1.23.2.3
diff -c -r1.23.2.3 sysv_msg.c
*** kern/sysv_msg.c	1 Nov 2000 17:58:06 -0000	1.23.2.3
--- kern/sysv_msg.c	15 Aug 2002 13:37:16 -0000
***************
*** 40,45 ****
--- 40,46 ----
  #undef MSG_DEBUG_OK
  
  static void msg_freehdr __P((struct msg *msghdr));
+ static struct msqid_ds *msqid_to_msqptr __P((int umsqid));
  
  /* XXX casting to (sy_call_t *) is bogus, as usual. */
  static sy_call_t *msgcalls[] = {
***************
*** 112,118 ****
      				/* 0..(MSGSEG-1) -> index of next segment */
  };
  
! #define MSG_LOCKED	01000	/* Is this msqid_ds locked? */
  
  static int nfree_msgmaps;	/* # of free map entries */
  static short free_msgmaps;	/* head of linked list of free map entries */
--- 113,124 ----
      				/* 0..(MSGSEG-1) -> index of next segment */
  };
  
! struct i_msqid_ds {
! 	struct msqid_ds	e;		/* externally visable */
! 	struct klist	q_klist;	/* filters on this message queue */
! };
! 
! #define MSG_LOCKED	01000	/* Is this i_msqid_ds locked? */
  
  static int nfree_msgmaps;	/* # of free map entries */
  static short free_msgmaps;	/* head of linked list of free map entries */
***************
*** 120,126 ****
  static char *msgpool;		/* MSGMAX byte long msg buffer pool */
  static struct msgmap *msgmaps;	/* MSGSEG msgmap structures */
  static struct msg *msghdrs;	/* MSGTQL msg headers */
! static struct msqid_ds *msqids;	/* MSGMNI msqid_ds struct's */
  
  static void
  msginit(dummy)
--- 126,132 ----
  static char *msgpool;		/* MSGMAX byte long msg buffer pool */
  static struct msgmap *msgmaps;	/* MSGSEG msgmap structures */
  static struct msg *msghdrs;	/* MSGTQL msg headers */
! static struct i_msqid_ds *i_msqids;	/* MSGMNI i_msqid_ds struct's */
  
  static void
  msginit(dummy)
***************
*** 137,145 ****
  	msghdrs = malloc(sizeof(struct msg) * msginfo.msgtql, M_MSG, M_WAITOK);
  	if (msghdrs == NULL)
  		panic("msghdrs is NULL");
! 	msqids = malloc(sizeof(struct msqid_ds) * msginfo.msgmni, M_MSG, M_WAITOK);
! 	if (msqids == NULL)
! 		panic("msqids is NULL");
  
  	/*
  	 * msginfo.msgssz should be a power of two for efficiency reasons.
--- 143,151 ----
  	msghdrs = malloc(sizeof(struct msg) * msginfo.msgtql, M_MSG, M_WAITOK);
  	if (msghdrs == NULL)
  		panic("msghdrs is NULL");
! 	i_msqids = malloc(sizeof(struct i_msqid_ds) * msginfo.msgmni, M_MSG, M_WAITOK);
! 	if (i_msqids == NULL)
! 		panic("i_msqids is NULL");
  
  	/*
  	 * msginfo.msgssz should be a power of two for efficiency reasons.
***************
*** 183,195 ****
      	}
  	free_msghdrs = &msghdrs[0];
  
! 	if (msqids == NULL)
! 		panic("msqids is NULL");
  
  	for (i = 0; i < msginfo.msgmni; i++) {
! 		msqids[i].msg_qbytes = 0;	/* implies entry is available */
! 		msqids[i].msg_perm.seq = 0;	/* reset to a known value */
! 		msqids[i].msg_perm.mode = 0;
  	}
  }
  SYSINIT(sysv_msg, SI_SUB_SYSV_MSG, SI_ORDER_FIRST, msginit, NULL)
--- 189,201 ----
      	}
  	free_msghdrs = &msghdrs[0];
  
! 	if (i_msqids == NULL)
! 		panic("i_msqids is NULL");
  
  	for (i = 0; i < msginfo.msgmni; i++) {
! 		i_msqids[i].e.msg_qbytes = 0;	/* implies entry is available */
! 		i_msqids[i].e.msg_perm.seq = 0;	/* reset to a known value */
! 		i_msqids[i].e.msg_perm.mode = 0;
  	}
  }
  SYSINIT(sysv_msg, SI_SUB_SYSV_MSG, SI_ORDER_FIRST, msginit, NULL)
***************
*** 243,248 ****
--- 249,288 ----
  	free_msghdrs = msghdr;
  }
  
+ static struct msqid_ds *   
+ msqid_to_msqptr(umsqid)
+ 	int umsqid;
+ {
+ 	int msqid;
+ 	struct msqid_ds *msqptr = 0;
+ 
+ 	msqid = IPCID_TO_IX(umsqid);
+ 
+ 	if (msqid < 0 || msqid >= msginfo.msgmni) {
+ #ifdef MSG_DEBUG_OK
+ 		printf("msqid (%d) out of range (0<=msqid<%d)\n", msqid,
+ 		    msginfo.msgmni);
+ #endif
+ 	} else {
+ 
+ 		msqptr = (struct msqid_ds *)&i_msqids[msqid];
+ 
+ 		if (msqptr->msg_qbytes == 0) {
+ #ifdef MSG_DEBUG_OK
+ 			printf("no such msqid\n");
+ #endif
+ 			msqptr = NULL;
+ 		} else if (msqptr->msg_perm.seq != IPCID_TO_SEQ(umsqid)) {
+ #ifdef MSG_DEBUG_OK
+ 			printf("wrong sequence number\n");
+ 			msqptr = NULL;
+ #endif
+ 		}
+ 	}
+ 
+ 	return(msqptr);
+ }
+ 
  #ifndef _SYS_SYSPROTO_H_
  struct msgctl_args {
  	int	msqid;
***************
*** 256,262 ****
  	struct proc *p;
  	register struct msgctl_args *uap;
  {
- 	int msqid = uap->msqid;
  	int cmd = uap->cmd;
  	struct msqid_ds *user_msqptr = uap->buf;
  	int rval, eval;
--- 296,301 ----
***************
*** 270,299 ****
  	if (!jail_sysvipc_allowed && p->p_prison != NULL)
  		return (ENOSYS);
  
! 	msqid = IPCID_TO_IX(msqid);
! 
! 	if (msqid < 0 || msqid >= msginfo.msgmni) {
! #ifdef MSG_DEBUG_OK
! 		printf("msqid (%d) out of range (0<=msqid<%d)\n", msqid,
! 		    msginfo.msgmni);
! #endif
  		return(EINVAL);
- 	}
- 
- 	msqptr = &msqids[msqid];
- 
- 	if (msqptr->msg_qbytes == 0) {
- #ifdef MSG_DEBUG_OK
- 		printf("no such msqid\n");
- #endif
- 		return(EINVAL);
- 	}
- 	if (msqptr->msg_perm.seq != IPCID_TO_SEQ(uap->msqid)) {
- #ifdef MSG_DEBUG_OK
- 		printf("wrong sequence number\n");
- #endif
- 		return(EINVAL);
- 	}
  
  	eval = 0;
  	rval = 0;
--- 309,316 ----
  	if (!jail_sysvipc_allowed && p->p_prison != NULL)
  		return (ENOSYS);
  
! 	if ((msqptr = msqid_to_msqptr(uap->msqid)) == NULL)
  		return(EINVAL);
  
  	eval = 0;
  	rval = 0;
***************
*** 305,310 ****
--- 322,332 ----
  		struct msg *msghdr;
  		if ((eval = ipcperm(p, &msqptr->msg_perm, IPC_M)))
  			return(eval);
+ 
+ 		/* notify intent before actually removing message queue */
+ 		KNOTE(&((struct i_msqid_ds *)msqptr)->q_klist,
+ 						NOTE_IPC_RMID | uap->msqid);
+ 
  		/* Free the message headers */
  		msghdr = msqptr->msg_first;
  		while (msghdr != NULL) {
***************
*** 411,417 ****
  
  	if (key != IPC_PRIVATE) {
  		for (msqid = 0; msqid < msginfo.msgmni; msqid++) {
! 			msqptr = &msqids[msqid];
  			if (msqptr->msg_qbytes != 0 &&
  			    msqptr->msg_perm.key == key)
  				break;
--- 433,439 ----
  
  	if (key != IPC_PRIVATE) {
  		for (msqid = 0; msqid < msginfo.msgmni; msqid++) {
! 			msqptr = (struct msqid_ds *)&i_msqids[msqid];
  			if (msqptr->msg_qbytes != 0 &&
  			    msqptr->msg_perm.key == key)
  				break;
***************
*** 448,454 ****
  			 * they are copying the message in/out.  We can't
  			 * re-use the entry until they release it.
  			 */
! 			msqptr = &msqids[msqid];
  			if (msqptr->msg_qbytes == 0 &&
  			    (msqptr->msg_perm.mode & MSG_LOCKED) == 0)
  				break;
--- 470,476 ----
  			 * they are copying the message in/out.  We can't
  			 * re-use the entry until they release it.
  			 */
! 			msqptr = (struct msqid_ds *)&i_msqids[msqid];
  			if (msqptr->msg_qbytes == 0 &&
  			    (msqptr->msg_perm.mode & MSG_LOCKED) == 0)
  				break;
***************
*** 480,485 ****
--- 502,508 ----
  		msqptr->msg_stime = 0;
  		msqptr->msg_rtime = 0;
  		msqptr->msg_ctime = time_second;
+ 		bzero(&((struct i_msqid_ds *)msqptr)->q_klist, sizeof(struct klist));
  	} else {
  #ifdef MSG_DEBUG_OK
  		printf("didn't find it and wasn't asked to create it\n");
***************
*** 507,513 ****
  	struct proc *p;
  	register struct msgsnd_args *uap;
  {
- 	int msqid = uap->msqid;
  	void *user_msgp = uap->msgp;
  	size_t msgsz = uap->msgsz;
  	int msgflg = uap->msgflg;
--- 530,535 ----
***************
*** 515,520 ****
--- 537,543 ----
  	register struct msqid_ds *msqptr;
  	register struct msg *msghdr;
  	short next;
+ 	int s;
  
  #ifdef MSG_DEBUG_OK
  	printf("call to msgsnd(%d, 0x%x, %d, %d)\n", msqid, user_msgp, msgsz,
***************
*** 524,552 ****
  	if (!jail_sysvipc_allowed && p->p_prison != NULL)
  		return (ENOSYS);
  
! 	msqid = IPCID_TO_IX(msqid);
! 
! 	if (msqid < 0 || msqid >= msginfo.msgmni) {
! #ifdef MSG_DEBUG_OK
! 		printf("msqid (%d) out of range (0<=msqid<%d)\n", msqid,
! 		    msginfo.msgmni);
! #endif
  		return(EINVAL);
- 	}
- 
- 	msqptr = &msqids[msqid];
- 	if (msqptr->msg_qbytes == 0) {
- #ifdef MSG_DEBUG_OK
- 		printf("no such message queue id\n");
- #endif
- 		return(EINVAL);
- 	}
- 	if (msqptr->msg_perm.seq != IPCID_TO_SEQ(uap->msqid)) {
- #ifdef MSG_DEBUG_OK
- 		printf("wrong sequence number\n");
- #endif
- 		return(EINVAL);
- 	}
  
  	if ((eval = ipcperm(p, &msqptr->msg_perm, IPC_W))) {
  #ifdef MSG_DEBUG_OK
--- 547,554 ----
  	if (!jail_sysvipc_allowed && p->p_prison != NULL)
  		return (ENOSYS);
  
! 	if ((msqptr = msqid_to_msqptr(uap->msqid)) == NULL)
  		return(EINVAL);
  
  	if ((eval = ipcperm(p, &msqptr->msg_perm, IPC_W))) {
  #ifdef MSG_DEBUG_OK
***************
*** 812,817 ****
--- 814,824 ----
  	msqptr->msg_lspid = p->p_pid;
  	msqptr->msg_stime = time_second;
  
+ 	s = splhigh();
+ 	KNOTE(&((struct i_msqid_ds *)msqptr)->q_klist,
+ 						NOTE_SYSVMSG | uap->msqid);
+ 	splx(s);
+ 
  	wakeup((caddr_t)msqptr);
  	p->p_retval[0] = 0;
  	return(0);
***************
*** 832,838 ****
  	struct proc *p;
  	register struct msgrcv_args *uap;
  {
- 	int msqid = uap->msqid;
  	void *user_msgp = uap->msgp;
  	size_t msgsz = uap->msgsz;
  	long msgtyp = uap->msgtyp;
--- 839,844 ----
***************
*** 851,879 ****
  	if (!jail_sysvipc_allowed && p->p_prison != NULL)
  		return (ENOSYS);
  
! 	msqid = IPCID_TO_IX(msqid);
! 
! 	if (msqid < 0 || msqid >= msginfo.msgmni) {
! #ifdef MSG_DEBUG_OK
! 		printf("msqid (%d) out of range (0<=msqid<%d)\n", msqid,
! 		    msginfo.msgmni);
! #endif
! 		return(EINVAL);
! 	}
! 
! 	msqptr = &msqids[msqid];
! 	if (msqptr->msg_qbytes == 0) {
! #ifdef MSG_DEBUG_OK
! 		printf("no such message queue id\n");
! #endif
  		return(EINVAL);
- 	}
- 	if (msqptr->msg_perm.seq != IPCID_TO_SEQ(uap->msqid)) {
- #ifdef MSG_DEBUG_OK
- 		printf("wrong sequence number\n");
- #endif
- 		return(EINVAL);
- 	}
  
  	if ((eval = ipcperm(p, &msqptr->msg_perm, IPC_R))) {
  #ifdef MSG_DEBUG_OK
--- 857,864 ----
  	if (!jail_sysvipc_allowed && p->p_prison != NULL)
  		return (ENOSYS);
  
! 	if ((msqptr = msqid_to_msqptr(uap->msqid)) == NULL)
  		return(EINVAL);
  
  	if ((eval = ipcperm(p, &msqptr->msg_perm, IPC_R))) {
  #ifdef MSG_DEBUG_OK
***************
*** 1099,1101 ****
--- 1084,1176 ----
  	p->p_retval[0] = msgsz;
  	return(0);
  }
+ 
+ static int
+ filt_sysvmsgattach(struct knote *kn)
+ {
+ 	int msqid = kn->kn_id;
+ 	register struct i_msqid_ds *i_msqptr;
+ 
+ 	if ((i_msqptr = (struct i_msqid_ds *)msqid_to_msqptr(msqid)) == NULL)
+ 		return (ESRCH);
+ 
+ 	kn->kn_flags |= EV_CLEAR;		/* automatically set */
+ 
+ 	/* XXX locking?  this might compete with another process. */
+ 	SLIST_INSERT_HEAD(&i_msqptr->q_klist, kn, kn_selnext);
+ 
+ 	return (0);
+ }
+ 
+ /*
+  * The knote may be attached to a message queue which is deleted out
+  * from under us by another process, leaving nothing for the knote to
+  * be attached to.  So when the ,essage queue is deleted, the knote
+  * is marked as DETACHED and also flagged as ONESHOT so it will be
+  * deleted when read out.  However, as part of the knote deletion,
+  * this routine is called, so a check is needed to avoid actually
+  * performing a detach, because the original message queue does not
+  * exist any more.  Note that reusing the queue ID will bzero the list
+  * head, orphaning the events which were linked to it, so this does
+  * not have to be tracked (thought it seems a bit messy, this is what
+  * kqueue already does for exiting processes, FWIW).
+  */
+ static void
+ filt_sysvmsgdetach(struct knote *kn)
+ {
+ 	int msqid = kn->kn_id;
+ 	register struct i_msqid_ds *i_msqptr;
+ 
+ 	if (kn->kn_status & KN_DETACHED)
+ 		return;
+ 
+ 	if ((i_msqptr = (struct i_msqid_ds *)msqid_to_msqptr(msqid)) == NULL)
+ 		return;
+ 
+ 	/* XXX locking?  this might compete with another process. */
+ 	SLIST_REMOVE(&i_msqptr->q_klist, kn, knote, kn_selnext);
+ }
+ 
+ /*
+  * Handle events on a given message queue object; called once for
+  * each object.
+  */
+ static int
+ filt_sysvmsg(struct knote *kn, long hint)
+ {
+ 	u_int event;
+ 	int msqid;
+ 	register struct msqid_ds *msqptr;
+ 
+ 	/*
+ 	 * mask data out of "hint".
+ 	 */
+ 	event = (u_int)(hint & NOTE_SVMCMASK);
+ 	msqid = (int)(hint & NOTE_SVMDMASK);
+ 
+ 	if ((msqptr = msqid_to_msqptr(msqid)) == NULL)
+ 		return(0);
+ 
+ 	/*
+ 	 * if the user is interested in this event, record it.
+ 	 */
+ 	if (kn->kn_sfflags & event)
+ 		kn->kn_fflags |= event;
+ 
+ 	switch( event) {
+ 	case NOTE_IPC_RMID:	/* message queue is being removed */
+ 		/* flag the event as finished */
+ 		kn->kn_status |= KN_DETACHED;
+ 		kn->kn_flags |= (EV_EOF | EV_ONESHOT); 
+ 		break;
+ 
+ 	case NOTE_SYSVMSG:	/* a message was enqueued */
+ 		kn->kn_data = msqptr->msg_qnum;	/* # of messages now in queue */
+ 		break;
+ 	}
+ 
+ 	return (kn->kn_fflags != 0);
+ }
+ 
+ struct filterops sysvmsg_filtops =
+ 	{ 0, filt_sysvmsgattach, filt_sysvmsgdetach, filt_sysvmsg };
Index: sys/event.h
===================================================================
RCS file: /usr/cvs/src/sys/sys/event.h,v
retrieving revision 1.5.2.5
diff -c -r1.5.2.5 event.h
*** sys/event.h	14 Dec 2001 19:21:22 -0000	1.5.2.5
--- sys/event.h	15 Aug 2002 12:59:53 -0000
***************
*** 36,43 ****
  #define EVFILT_PROC		(-5)	/* attached to struct proc */
  #define EVFILT_SIGNAL		(-6)	/* attached to struct proc */
  #define EVFILT_TIMER		(-7)	/* timers */
  
! #define EVFILT_SYSCOUNT		7
  
  #define EV_SET(kevp, a, b, c, d, e, f) do {	\
  	(kevp)->ident = (a);			\
--- 36,44 ----
  #define EVFILT_PROC		(-5)	/* attached to struct proc */
  #define EVFILT_SIGNAL		(-6)	/* attached to struct proc */
  #define EVFILT_TIMER		(-7)	/* timers */
+ #define	EVFILT_SYSVMSG		(-8)	/* System V messages */
  
! #define EVFILT_SYSCOUNT		8
  
  #define EV_SET(kevp, a, b, c, d, e, f) do {	\
  	(kevp)->ident = (a);			\
***************
*** 103,108 ****
--- 104,117 ----
  #define	NOTE_TRACK	0x00000001		/* follow across forks */
  #define	NOTE_TRACKERR	0x00000002		/* could not track child */
  #define	NOTE_CHILD	0x00000004		/* am a child process */
+ 
+ /*
+  * data/hint flags for EVFILT_SYSVMSG, shared with userspace
+  */
+ #define	NOTE_IPC_RMID	0x80000000		/* message queue deleted */
+ #define	NOTE_SYSVMSG	0x40000000		/* message enqueued */
+ #define	NOTE_SVMCMASK	0xf0000000		/* mask for hint bits */
+ #define	NOTE_SVMDMASK	0x000fffff		/* mask for msqid */
  
  /*
   * This is currently visible to userland to work around broken

--------------AB3423182009884E14B98646
Content-Type: text/plain; charset=us-ascii;
 name="rcv.c"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
 filename="rcv.c"

#include <sys/types.h>
#include<sys/event.h>
#include<sys/time.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#include <stdio.h>
#include <string.h>

#define	KEYPATH	"/root/RCALL/msg/foo"

struct foo {
	long	mtype;
	char	mtext[ 80];
};



main() {
	key_t		key;
	int		msqid;
	struct foo	foo;
	int		i;
	struct kevent	ev;
	struct timespec	nullts = { 0, 0 };
	int		kq;
	int		n;

	key = ftok( KEYPATH, 'b');
	msqid = msgget( key, 0666 | IPC_CREAT);

	if( msqid == -1) {
		perror("msgget");
		exit( 2);
	}
	printf( "Message queue %d\n", msqid);

	kq = kqueue();

	EV_SET( &ev,
		msqid,
		EVFILT_SYSVMSG,
		EV_ADD | EV_ENABLE,
		NOTE_SYSVMSG,
		0,
		0);

	kevent( kq, &ev, 1, NULL, 0, &nullts);


	for(;;) {
		n = kevent(kq, NULL, 0, &ev, 1, NULL);
		if (n > 0) {
			printf( "%d messages pending on queue %d ",
				ev.data,
				ev.ident);
		}

		/* for each pending message, retrieve it */
		for(i=0; i < ev.data; i++) {
			/* p3 = 0 :== receive any message */
			if (msgrcv(msqid, &foo, sizeof(foo), 0, 0) == -1) {
				perror("msgsnd");
				exit( 3);
			}

			printf( "snd says: '%s'\n", foo.mtext);
		}
	}

	/* NOTREACHED */

	/* destroy queue */
	msgctl(msqid, IPC_RMID, NULL);

	exit( 0);
}

--------------AB3423182009884E14B98646
Content-Type: text/plain; charset=us-ascii;
 name="snd.c"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
 filename="snd.c"

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#include <stdio.h>
#include <string.h>

#define	KEYPATH	"/root/RCALL/msg/foo"

struct foo {
	long	mtype;
	char	mtext[ 80];
};


main() {
	key_t		key;
	int		msqid;
	struct foo	foo;

	key = ftok( KEYPATH, 'b');
	msqid = msgget( key, 0666 | IPC_CREAT);

	if( msqid == -1) {
		perror("msgget");
		exit( 2);
	}

	foo.mtype = 75;
	strcpy( foo.mtext, "Hello, world!");

	if ( msgsnd(msqid, &foo, sizeof(foo), 0) == -1) {
		perror("msgsnd");
		exit( 3);
	}

	exit( 0);
}

--------------AB3423182009884E14B98646
Content-Type: text/plain; charset=us-ascii;
 name="kqueue.udiff"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
 filename="kqueue.udiff"

Index: kern/kern_event.c
===================================================================
RCS file: /usr/cvs/src/sys/kern/kern_event.c,v
retrieving revision 1.2.2.8
diff -u -r1.2.2.8 kern_event.c
--- kern/kern_event.c	14 Dec 2001 19:24:42 -0000	1.2.2.8
+++ kern/kern_event.c	15 Aug 2002 11:33:50 -0000
@@ -122,6 +122,7 @@
 
 extern struct filterops aio_filtops;
 extern struct filterops sig_filtops;
+extern struct filterops sysvmsg_filtops;
 
 /*
  * Table for for all system-defined filters.
@@ -134,6 +135,7 @@
 	&proc_filtops,			/* EVFILT_PROC */
 	&sig_filtops,			/* EVFILT_SIGNAL */
 	&timer_filtops,			/* EVFILT_TIMER */
+	&sysvmsg_filtops,		/* EVFILT_SYSVMSG */
 };
 
 static int
Index: kern/sysv_msg.c
===================================================================
RCS file: /usr/cvs/src/sys/kern/sysv_msg.c,v
retrieving revision 1.23.2.3
diff -u -r1.23.2.3 sysv_msg.c
--- kern/sysv_msg.c	1 Nov 2000 17:58:06 -0000	1.23.2.3
+++ kern/sysv_msg.c	15 Aug 2002 13:37:16 -0000
@@ -40,6 +40,7 @@
 #undef MSG_DEBUG_OK
 
 static void msg_freehdr __P((struct msg *msghdr));
+static struct msqid_ds *msqid_to_msqptr __P((int umsqid));
 
 /* XXX casting to (sy_call_t *) is bogus, as usual. */
 static sy_call_t *msgcalls[] = {
@@ -112,7 +113,12 @@
     				/* 0..(MSGSEG-1) -> index of next segment */
 };
 
-#define MSG_LOCKED	01000	/* Is this msqid_ds locked? */
+struct i_msqid_ds {
+	struct msqid_ds	e;		/* externally visable */
+	struct klist	q_klist;	/* filters on this message queue */
+};
+
+#define MSG_LOCKED	01000	/* Is this i_msqid_ds locked? */
 
 static int nfree_msgmaps;	/* # of free map entries */
 static short free_msgmaps;	/* head of linked list of free map entries */
@@ -120,7 +126,7 @@
 static char *msgpool;		/* MSGMAX byte long msg buffer pool */
 static struct msgmap *msgmaps;	/* MSGSEG msgmap structures */
 static struct msg *msghdrs;	/* MSGTQL msg headers */
-static struct msqid_ds *msqids;	/* MSGMNI msqid_ds struct's */
+static struct i_msqid_ds *i_msqids;	/* MSGMNI i_msqid_ds struct's */
 
 static void
 msginit(dummy)
@@ -137,9 +143,9 @@
 	msghdrs = malloc(sizeof(struct msg) * msginfo.msgtql, M_MSG, M_WAITOK);
 	if (msghdrs == NULL)
 		panic("msghdrs is NULL");
-	msqids = malloc(sizeof(struct msqid_ds) * msginfo.msgmni, M_MSG, M_WAITOK);
-	if (msqids == NULL)
-		panic("msqids is NULL");
+	i_msqids = malloc(sizeof(struct i_msqid_ds) * msginfo.msgmni, M_MSG, M_WAITOK);
+	if (i_msqids == NULL)
+		panic("i_msqids is NULL");
 
 	/*
 	 * msginfo.msgssz should be a power of two for efficiency reasons.
@@ -183,13 +189,13 @@
     	}
 	free_msghdrs = &msghdrs[0];
 
-	if (msqids == NULL)
-		panic("msqids is NULL");
+	if (i_msqids == NULL)
+		panic("i_msqids is NULL");
 
 	for (i = 0; i < msginfo.msgmni; i++) {
-		msqids[i].msg_qbytes = 0;	/* implies entry is available */
-		msqids[i].msg_perm.seq = 0;	/* reset to a known value */
-		msqids[i].msg_perm.mode = 0;
+		i_msqids[i].e.msg_qbytes = 0;	/* implies entry is available */
+		i_msqids[i].e.msg_perm.seq = 0;	/* reset to a known value */
+		i_msqids[i].e.msg_perm.mode = 0;
 	}
 }
 SYSINIT(sysv_msg, SI_SUB_SYSV_MSG, SI_ORDER_FIRST, msginit, NULL)
@@ -243,6 +249,40 @@
 	free_msghdrs = msghdr;
 }
 
+static struct msqid_ds *   
+msqid_to_msqptr(umsqid)
+	int umsqid;
+{
+	int msqid;
+	struct msqid_ds *msqptr = 0;
+
+	msqid = IPCID_TO_IX(umsqid);
+
+	if (msqid < 0 || msqid >= msginfo.msgmni) {
+#ifdef MSG_DEBUG_OK
+		printf("msqid (%d) out of range (0<=msqid<%d)\n", msqid,
+		    msginfo.msgmni);
+#endif
+	} else {
+
+		msqptr = (struct msqid_ds *)&i_msqids[msqid];
+
+		if (msqptr->msg_qbytes == 0) {
+#ifdef MSG_DEBUG_OK
+			printf("no such msqid\n");
+#endif
+			msqptr = NULL;
+		} else if (msqptr->msg_perm.seq != IPCID_TO_SEQ(umsqid)) {
+#ifdef MSG_DEBUG_OK
+			printf("wrong sequence number\n");
+			msqptr = NULL;
+#endif
+		}
+	}
+
+	return(msqptr);
+}
+
 #ifndef _SYS_SYSPROTO_H_
 struct msgctl_args {
 	int	msqid;
@@ -256,7 +296,6 @@
 	struct proc *p;
 	register struct msgctl_args *uap;
 {
-	int msqid = uap->msqid;
 	int cmd = uap->cmd;
 	struct msqid_ds *user_msqptr = uap->buf;
 	int rval, eval;
@@ -270,30 +309,8 @@
 	if (!jail_sysvipc_allowed && p->p_prison != NULL)
 		return (ENOSYS);
 
-	msqid = IPCID_TO_IX(msqid);
-
-	if (msqid < 0 || msqid >= msginfo.msgmni) {
-#ifdef MSG_DEBUG_OK
-		printf("msqid (%d) out of range (0<=msqid<%d)\n", msqid,
-		    msginfo.msgmni);
-#endif
+	if ((msqptr = msqid_to_msqptr(uap->msqid)) == NULL)
 		return(EINVAL);
-	}
-
-	msqptr = &msqids[msqid];
-
-	if (msqptr->msg_qbytes == 0) {
-#ifdef MSG_DEBUG_OK
-		printf("no such msqid\n");
-#endif
-		return(EINVAL);
-	}
-	if (msqptr->msg_perm.seq != IPCID_TO_SEQ(uap->msqid)) {
-#ifdef MSG_DEBUG_OK
-		printf("wrong sequence number\n");
-#endif
-		return(EINVAL);
-	}
 
 	eval = 0;
 	rval = 0;
@@ -305,6 +322,11 @@
 		struct msg *msghdr;
 		if ((eval = ipcperm(p, &msqptr->msg_perm, IPC_M)))
 			return(eval);
+
+		/* notify intent before actually removing message queue */
+		KNOTE(&((struct i_msqid_ds *)msqptr)->q_klist,
+						NOTE_IPC_RMID | uap->msqid);
+
 		/* Free the message headers */
 		msghdr = msqptr->msg_first;
 		while (msghdr != NULL) {
@@ -411,7 +433,7 @@
 
 	if (key != IPC_PRIVATE) {
 		for (msqid = 0; msqid < msginfo.msgmni; msqid++) {
-			msqptr = &msqids[msqid];
+			msqptr = (struct msqid_ds *)&i_msqids[msqid];
 			if (msqptr->msg_qbytes != 0 &&
 			    msqptr->msg_perm.key == key)
 				break;
@@ -448,7 +470,7 @@
 			 * they are copying the message in/out.  We can't
 			 * re-use the entry until they release it.
 			 */
-			msqptr = &msqids[msqid];
+			msqptr = (struct msqid_ds *)&i_msqids[msqid];
 			if (msqptr->msg_qbytes == 0 &&
 			    (msqptr->msg_perm.mode & MSG_LOCKED) == 0)
 				break;
@@ -480,6 +502,7 @@
 		msqptr->msg_stime = 0;
 		msqptr->msg_rtime = 0;
 		msqptr->msg_ctime = time_second;
+		bzero(&((struct i_msqid_ds *)msqptr)->q_klist, sizeof(struct klist));
 	} else {
 #ifdef MSG_DEBUG_OK
 		printf("didn't find it and wasn't asked to create it\n");
@@ -507,7 +530,6 @@
 	struct proc *p;
 	register struct msgsnd_args *uap;
 {
-	int msqid = uap->msqid;
 	void *user_msgp = uap->msgp;
 	size_t msgsz = uap->msgsz;
 	int msgflg = uap->msgflg;
@@ -515,6 +537,7 @@
 	register struct msqid_ds *msqptr;
 	register struct msg *msghdr;
 	short next;
+	int s;
 
 #ifdef MSG_DEBUG_OK
 	printf("call to msgsnd(%d, 0x%x, %d, %d)\n", msqid, user_msgp, msgsz,
@@ -524,29 +547,8 @@
 	if (!jail_sysvipc_allowed && p->p_prison != NULL)
 		return (ENOSYS);
 
-	msqid = IPCID_TO_IX(msqid);
-
-	if (msqid < 0 || msqid >= msginfo.msgmni) {
-#ifdef MSG_DEBUG_OK
-		printf("msqid (%d) out of range (0<=msqid<%d)\n", msqid,
-		    msginfo.msgmni);
-#endif
+	if ((msqptr = msqid_to_msqptr(uap->msqid)) == NULL)
 		return(EINVAL);
-	}
-
-	msqptr = &msqids[msqid];
-	if (msqptr->msg_qbytes == 0) {
-#ifdef MSG_DEBUG_OK
-		printf("no such message queue id\n");
-#endif
-		return(EINVAL);
-	}
-	if (msqptr->msg_perm.seq != IPCID_TO_SEQ(uap->msqid)) {
-#ifdef MSG_DEBUG_OK
-		printf("wrong sequence number\n");
-#endif
-		return(EINVAL);
-	}
 
 	if ((eval = ipcperm(p, &msqptr->msg_perm, IPC_W))) {
 #ifdef MSG_DEBUG_OK
@@ -812,6 +814,11 @@
 	msqptr->msg_lspid = p->p_pid;
 	msqptr->msg_stime = time_second;
 
+	s = splhigh();
+	KNOTE(&((struct i_msqid_ds *)msqptr)->q_klist,
+						NOTE_SYSVMSG | uap->msqid);
+	splx(s);
+
 	wakeup((caddr_t)msqptr);
 	p->p_retval[0] = 0;
 	return(0);
@@ -832,7 +839,6 @@
 	struct proc *p;
 	register struct msgrcv_args *uap;
 {
-	int msqid = uap->msqid;
 	void *user_msgp = uap->msgp;
 	size_t msgsz = uap->msgsz;
 	long msgtyp = uap->msgtyp;
@@ -851,29 +857,8 @@
 	if (!jail_sysvipc_allowed && p->p_prison != NULL)
 		return (ENOSYS);
 
-	msqid = IPCID_TO_IX(msqid);
-
-	if (msqid < 0 || msqid >= msginfo.msgmni) {
-#ifdef MSG_DEBUG_OK
-		printf("msqid (%d) out of range (0<=msqid<%d)\n", msqid,
-		    msginfo.msgmni);
-#endif
-		return(EINVAL);
-	}
-
-	msqptr = &msqids[msqid];
-	if (msqptr->msg_qbytes == 0) {
-#ifdef MSG_DEBUG_OK
-		printf("no such message queue id\n");
-#endif
+	if ((msqptr = msqid_to_msqptr(uap->msqid)) == NULL)
 		return(EINVAL);
-	}
-	if (msqptr->msg_perm.seq != IPCID_TO_SEQ(uap->msqid)) {
-#ifdef MSG_DEBUG_OK
-		printf("wrong sequence number\n");
-#endif
-		return(EINVAL);
-	}
 
 	if ((eval = ipcperm(p, &msqptr->msg_perm, IPC_R))) {
 #ifdef MSG_DEBUG_OK
@@ -1099,3 +1084,93 @@
 	p->p_retval[0] = msgsz;
 	return(0);
 }
+
+static int
+filt_sysvmsgattach(struct knote *kn)
+{
+	int msqid = kn->kn_id;
+	register struct i_msqid_ds *i_msqptr;
+
+	if ((i_msqptr = (struct i_msqid_ds *)msqid_to_msqptr(msqid)) == NULL)
+		return (ESRCH);
+
+	kn->kn_flags |= EV_CLEAR;		/* automatically set */
+
+	/* XXX locking?  this might compete with another process. */
+	SLIST_INSERT_HEAD(&i_msqptr->q_klist, kn, kn_selnext);
+
+	return (0);
+}
+
+/*
+ * The knote may be attached to a message queue which is deleted out
+ * from under us by another process, leaving nothing for the knote to
+ * be attached to.  So when the ,essage queue is deleted, the knote
+ * is marked as DETACHED and also flagged as ONESHOT so it will be
+ * deleted when read out.  However, as part of the knote deletion,
+ * this routine is called, so a check is needed to avoid actually
+ * performing a detach, because the original message queue does not
+ * exist any more.  Note that reusing the queue ID will bzero the list
+ * head, orphaning the events which were linked to it, so this does
+ * not have to be tracked (thought it seems a bit messy, this is what
+ * kqueue already does for exiting processes, FWIW).
+ */
+static void
+filt_sysvmsgdetach(struct knote *kn)
+{
+	int msqid = kn->kn_id;
+	register struct i_msqid_ds *i_msqptr;
+
+	if (kn->kn_status & KN_DETACHED)
+		return;
+
+	if ((i_msqptr = (struct i_msqid_ds *)msqid_to_msqptr(msqid)) == NULL)
+		return;
+
+	/* XXX locking?  this might compete with another process. */
+	SLIST_REMOVE(&i_msqptr->q_klist, kn, knote, kn_selnext);
+}
+
+/*
+ * Handle events on a given message queue object; called once for
+ * each object.
+ */
+static int
+filt_sysvmsg(struct knote *kn, long hint)
+{
+	u_int event;
+	int msqid;
+	register struct msqid_ds *msqptr;
+
+	/*
+	 * mask data out of "hint".
+	 */
+	event = (u_int)(hint & NOTE_SVMCMASK);
+	msqid = (int)(hint & NOTE_SVMDMASK);
+
+	if ((msqptr = msqid_to_msqptr(msqid)) == NULL)
+		return(0);
+
+	/*
+	 * if the user is interested in this event, record it.
+	 */
+	if (kn->kn_sfflags & event)
+		kn->kn_fflags |= event;
+
+	switch( event) {
+	case NOTE_IPC_RMID:	/* message queue is being removed */
+		/* flag the event as finished */
+		kn->kn_status |= KN_DETACHED;
+		kn->kn_flags |= (EV_EOF | EV_ONESHOT); 
+		break;
+
+	case NOTE_SYSVMSG:	/* a message was enqueued */
+		kn->kn_data = msqptr->msg_qnum;	/* # of messages now in queue */
+		break;
+	}
+
+	return (kn->kn_fflags != 0);
+}
+
+struct filterops sysvmsg_filtops =
+	{ 0, filt_sysvmsgattach, filt_sysvmsgdetach, filt_sysvmsg };
Index: sys/event.h
===================================================================
RCS file: /usr/cvs/src/sys/sys/event.h,v
retrieving revision 1.5.2.5
diff -u -r1.5.2.5 event.h
--- sys/event.h	14 Dec 2001 19:21:22 -0000	1.5.2.5
+++ sys/event.h	15 Aug 2002 12:59:53 -0000
@@ -36,8 +36,9 @@
 #define EVFILT_PROC		(-5)	/* attached to struct proc */
 #define EVFILT_SIGNAL		(-6)	/* attached to struct proc */
 #define EVFILT_TIMER		(-7)	/* timers */
+#define	EVFILT_SYSVMSG		(-8)	/* System V messages */
 
-#define EVFILT_SYSCOUNT		7
+#define EVFILT_SYSCOUNT		8
 
 #define EV_SET(kevp, a, b, c, d, e, f) do {	\
 	(kevp)->ident = (a);			\
@@ -103,6 +104,14 @@
 #define	NOTE_TRACK	0x00000001		/* follow across forks */
 #define	NOTE_TRACKERR	0x00000002		/* could not track child */
 #define	NOTE_CHILD	0x00000004		/* am a child process */
+
+/*
+ * data/hint flags for EVFILT_SYSVMSG, shared with userspace
+ */
+#define	NOTE_IPC_RMID	0x80000000		/* message queue deleted */
+#define	NOTE_SYSVMSG	0x40000000		/* message enqueued */
+#define	NOTE_SVMCMASK	0xf0000000		/* mask for hint bits */
+#define	NOTE_SVMDMASK	0x000fffff		/* mask for msqid */
 
 /*
  * This is currently visible to userland to work around broken

--------------AB3423182009884E14B98646--


To Unsubscribe: send mail to majordomo@FreeBSD.org
with "unsubscribe freebsd-hackers" in the body of the message




Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?3D5BACD5.405A71DF>