kqueue例子
2024-10-04 11:10:22
网络服务器通常都使用epoll进行异步IO处理,而开发者通常使用mac,为了方便开发,我把自己的handy库移植到了mac平台上。移植过程中,网上居然没有搜到kqueue的使用例子,让我惊讶不已。为了让大家不用像我一样再次花费大力气搞定kqueue,我整理了一个简单清晰可运行的kqueue例子,供大家参考。
kqueue一共有几个函数:
//类似epoll_create
int kqueue(void);
//兼具epoll_ctl及epoll_wait功能
int kevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout);
//设定kevent参数的宏
EV_SET(&kev, ident, filter, flags, fflags, data, udata);
struct kevent {
uintptr_t ident; /* identifier for this event */
int16_t filter; /* filter for event */
uint16_t flags; /* general flags */
uint32_t fflags; /* filter-specific flags */
intptr_t data; /* filter-specific data */
void *udata; /* opaque user data identifier */
};
函数调用示例:
//创建kqueue
int epollfd = kqueue();
//添加或者修改fd
struct kevent ev[];
int n = ;
if (events & kReadEvent) {
EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|EV_ENABLE, , , (void*)(intptr_t)fd);
} else if (modify){
EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, , , (void*)(intptr_t)fd);
}
if (events & kWriteEvent) {
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|EV_ENABLE, , , (void*)(intptr_t)fd);
} else if (modify){
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, , , (void*)(intptr_t)fd);
}
printf("%s fd %d events read %d write %d\n",
modify ? "mod" : "add", fd, events & kReadEvent, events & kWriteEvent);
int r = kevent(efd, ev, n, NULL, , NULL);
//获取ready的fd
struct timespec timeout;
timeout.tv_sec = waitms / ;
timeout.tv_nsec = (waitms % ) * * ;
const int kMaxEvents = ;
struct kevent activeEvs[kMaxEvents];
int n = kevent(efd, NULL, , activeEvs, kMaxEvents, &timeout);
//处理IO事件
for (int i = ; i < n; i ++) {
int fd = (int)(intptr_t)activeEvs[i].udata;
int events = activeEvs[i].filter;
if (events == EVFILT_READ) {
handleRead(efd, fd);
} else if (events == EVFILT_WRITE) {
handleWrite(efd, fd);
}
}
注意kevent与epoll最大的不同在于READ/WRITE事件是分开注册并且分开返回的,而Epoll则是一个fd一次返回读和写事件,用标志位来判断。
可以运行的代码如下:kqueue-example(handy对kqueue提供了封装版本)
#include <sys/socket.h>
#include <sys/event.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h> #define exit_if(r, ...) if(r) {printf(__VA_ARGS__); printf("error no: %d error msg %s\n", errno, strerror(errno)); exit(1);} const int kReadEvent = ;
const int kWriteEvent = ; void setNonBlock(int fd) {
int flags = fcntl(fd, F_GETFL, );
exit_if(flags<, "fcntl failed");
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
exit_if(r<, "fcntl failed");
} void updateEvents(int efd, int fd, int events, bool modify) {
struct kevent ev[];
int n = ;
if (events & kReadEvent) {
EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|EV_ENABLE, , , (void*)(intptr_t)fd);
} else if (modify){
EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, , , (void*)(intptr_t)fd);
}
if (events & kWriteEvent) {
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|EV_ENABLE, , , (void*)(intptr_t)fd);
} else if (modify){
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, , , (void*)(intptr_t)fd);
}
printf("%s fd %d events read %d write %d\n",
modify ? "mod" : "add", fd, events & kReadEvent, events & kWriteEvent);
int r = kevent(efd, ev, n, NULL, , NULL);
exit_if(r, "kevent failed ");
} void handleAccept(int efd, int fd) {
struct sockaddr_in raddr;
socklen_t rsz = sizeof(raddr);
int cfd = accept(fd,(struct sockaddr *)&raddr,&rsz);
exit_if(cfd<, "accept failed");
sockaddr_in peer, local;
socklen_t alen = sizeof(peer);
int r = getpeername(cfd, (sockaddr*)&peer, &alen);
exit_if(r<, "getpeername failed");
printf("accept a connection from %s\n", inet_ntoa(raddr.sin_addr));
setNonBlock(cfd);
updateEvents(efd, cfd, kReadEvent|kWriteEvent, false);
} void handleRead(int efd, int fd) {
char buf[];
int n = ;
while ((n=::read(fd, buf, sizeof buf)) > ) {
printf("read %d bytes\n", n);
int r = ::write(fd, buf, n); //写出读取的数据
//实际应用中,写出数据可能会返回EAGAIN,此时应当监听可写事件,当可写时再把数据写出
exit_if(r<=, "write error");
}
if (n< && (errno == EAGAIN || errno == EWOULDBLOCK))
return;
exit_if(n<, "read error"); //实际应用中,n<0应当检查各类错误,如EINTR
printf("fd %d closed\n", fd);
close(fd);
} void handleWrite(int efd, int fd) {
//实际应用应当实现可写时写出数据,无数据可写才关闭可写事件
updateEvents(efd, fd, kReadEvent, true);
} void loop_once(int efd, int lfd, int waitms) {
struct timespec timeout;
timeout.tv_sec = waitms / ;
timeout.tv_nsec = (waitms % ) * * ;
const int kMaxEvents = ;
struct kevent activeEvs[kMaxEvents];
int n = kevent(efd, NULL, , activeEvs, kMaxEvents, &timeout);
printf("epoll_wait return %d\n", n);
for (int i = ; i < n; i ++) {
int fd = (int)(intptr_t)activeEvs[i].udata;
int events = activeEvs[i].filter;
if (events == EVFILT_READ) {
if (fd == lfd) {
handleAccept(efd, fd);
} else {
handleRead(efd, fd);
}
} else if (events == EVFILT_WRITE) {
handleWrite(efd, fd);
} else {
exit_if(, "unknown event");
}
}
} int main() {
short port = ;
int epollfd = kqueue();
exit_if(epollfd < , "epoll_create failed");
int listenfd = socket(AF_INET, SOCK_STREAM, );
exit_if(listenfd < , "socket failed");
struct sockaddr_in addr;
memset(&addr, , sizeof addr);
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
int r = ::bind(listenfd,(struct sockaddr *)&addr, sizeof(struct sockaddr));
exit_if(r, "bind to 0.0.0.0:%d failed %d %s", port, errno, strerror(errno));
r = listen(listenfd, );
exit_if(r, "listen failed %d %s", errno, strerror(errno));
printf("fd %d listening at %d\n", listenfd, port);
setNonBlock(listenfd);
updateEvents(epollfd, listenfd, kReadEvent, false);
for (;;) { //实际应用应当注册信号处理函数,退出时清理资源
loop_once(epollfd, listenfd, );
}
return ;
}
最新文章
- selenium 常见面试题以及答案(Java版)
- 去 IOE,MySQL 完胜 PostgreSQL
- Ipad 日程管理APP使用心得
- hihoCode 1078 : 线段树的区间修改
- [DataTable] datatable根据表中的字段进行排序
- IOS设计模式之二(门面模式,装饰器模式)
- JS正则表达式验证数字非常全
- Git 远程仓库的管理和使用
- TexturePacker 介绍
- 2013 吉林通化邀请赛 D-City 离线型的并查集
- CDOJ 1269 ZhangYu Speech
- git异常操作解决办法合集
- AtomicInteger学习
- LeetCode(3):无重复字符的最大子串
- Ubuntu或linux 运行后台进程运行不挂断的办法
- thinkphp5 Exception类重定义
- springCloud相关推荐
- pygme 安装
- 吴裕雄 python 爬虫(1)
- Android 监听apk安装替换卸载广播