HDU-OS-3 Linux进程管理

多个并发运行的协作进程间通常需要进行信息交换,称为进程通信
像这样:

1
2
3
4
5
[*] Process_1:诶!!!你钓鱼去啊?
...
[*] Process_2:啊???不是,我钓鱼去!
...
[*] Process_1:嗨!!!我还以为你钓鱼去呢。

Folder准备

1
2
3
4
$ cd /home/user
$ mkdir os_3
$ cd os_3
$ mkdir vir_shell pipe_com msg_queue shared_mem

[*] Caution! 具体执行时 user 是你的用户名,若无强调,后面出现的所有 user 亦然

模拟shell

实验要求

实现一个模拟的shell:
编写三个不同的程序 cmd1.c,cmd2.c,cmd3.c,每个程序的功能自定,分别编译成可执行文件 cmd1,cmd2,cmd3。然后再编写一个程序,模拟 shell 程序的功能,能根据用户输入的字符串(表示相应的命令名),去为相应的命令创建子进程并让它去执行相应的程序,而父进程则等待子进程结束,然后再等待接收下一条命令。如果接收到的命令为 exit,则父进程结束;如果接收到的命令是无效命令,则显示 Command not found ,继续等待。

具体操作

cmd1

1
2
$ cd /home/user/os_3/vir_shell
$ vim cmd1.c
1
2
3
4
5
6
7
8
// /home/user/os_3/vir_shell/cmd1.c
#include <stdio.h>

int main()
{
printf("[*] Hello world! I'm cmd111111.\n");
return 0;
}
1
$ gcc cmd1.c -o cmd1

cmd2

1
$ vim cmd2.c
1
2
3
4
5
6
7
8
// /home/user/os_3/vir_shell/cmd2.c
#include <stdio.h>

int main()
{
printf("[*] Hello world! I'm cmd22222.\n");
return 0;
}
1
$ gcc cmd2.c -o cmd2

cmd3

1
$ vim cmd3.c
1
2
3
4
5
6
7
8
// /home/user/os_3/vir_shell/cmd3.c
#include <stdio.h>

int main()
{
printf("[*] Hello world! I'm cmd33333.\n");
return 0;
}
1
$ gcc cmd3.c -o cmd3

shell

1
$ vim shell.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// /home/user/os_3/vir_shell/shell.c
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>

char *sys_cmd [4] = {"exit", "cmd1", "cmd2", "cmd3"};

int get_cmd_id(char *cmd)
{
int i;
for (i = 0; i < 4; i++) {
if (strcmp(cmd, sys_cmd[i]) == 0) {
return i;
}
}
return -1;
}

void z_fork(int cmd_id)
{
pid_t pid = fork();
if (pid < 0) {
printf("[*] Error: fork() return error\n");
exit(0);
} else if (pid == 0) {
// children process
switch (cmd_id) {
case 1:
execl("./cmd1", "", NULL);
break;
case 2:
execl("./cmd2", "", NULL);
break;
case 3:
execl("./cmd3", "", NULL);
break;
default:
printf("[*] Error: command not found!!!\n");
exit(0);
}
}
}

int main()
{
char cmd[50];
int cmd_id = -1;
while (1) {
printf("-----------------------------------\n");
printf("Input command: \n> ");
scanf("%s", cmd);
cmd_id = get_cmd_id(cmd);
if (cmd_id == -1) {
printf("[*] Error: command not found!!!\n");
} else if (cmd_id == 0) {
printf("[*] Exit safely.\n");
exit(0);
} else {
z_fork(cmd_id);
}
wait(NULL);
}
}
1
$ gcc shell.c -o shell

实验效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ cd /home/user/os_3/vir_shell
$ ls
cmd1 cmd1.c cmd2 cmd2.c cmd3 cmd3.c shell shell.c
$ ./shell
-----------------------------------
Input command:
> cmd1
[*] Hello world! I'm cmd11111.
-----------------------------------
Input command:
> cmd2
[*] Hello world! I'm cmd22222.
-----------------------------------
Input command:
> cmd3
[*] Hello world! I'm cmd33333.
-----------------------------------
Input command:
> ipconfig
[*] Error: command not found!!!
-----------------------------------
Input command:
> exit
[*] Exit safely.

实验详解

fork()

fork() : 创建一个普通进程,调用一次返回两次,子进程中返回 0 ,父进程中返回 子进程pid 。清楚哪些代码块是子进程可以达到的,那些代码块是父进程可以达到的,对接下来的实验至关重要。

execl()

execl(const char *path, const char *arg, ... NULL) : 第一个参数 path 字符指针所指向要执行的文件路径, 接下来的参数代表执行该文件时传递的参数列表:argv[0] , argv[1] ... 最后一个参数须用空指针 NULL 作结束。

管道通信

实验要求

实现一个管道通信程序:
由父进程创建一个管道,然后再创建 3 个子进程,并由这三个子进程利用管道与父进程之间进行通信:子进程发送信息,父进程等三个子进程全部发完消息后再接收信息。通信的具体内容可根据自己的需要随意设计,要求能试验阻塞型读写过程中的各种情况,测试管道的默认大小,并且要实现进程间对管道的互斥访问。运行程序,观察各种情况下,进程实际读写的字节数以及进程阻塞唤醒的情况。

具体操作

父子进程通信

1
2
$ cd /home/user/os_3/pipe_com
$ vim pipe.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// /home/user/os_3/pipe_com/pipe.c
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <string.h>

#define SEM_W "sem_write"
#define SEM_R "sem_read"
#define ENABLE_NUM 3

void write_to_pipe(int fd[2], int sid);
int read_from_pipe(int fd[2]);
void P(sem_t *sem_ptr); // 原语操作P() = down() = wait()
void V(sem_t *sem_ptr); // 原语操作V() = up() = signal()
void catch_INT(int sig); // 捕获 Ctrl_c 退出父进程
void destroy_sem();

int main(void)
{
int fd[2];
pid_t pid; // Process ID Type 宏定义 unsigined int
int ret = -1, child_num = ENABLE_NUM, enable_val = -1, sid = 0, i;
sem_t *write_psx, *read_psx; // sem_t 信号量数据类型 unsigined int
write_psx = sem_open(SEM_W, O_CREAT, 0666, 1); // 创建并初始化有名信号量
read_psx = sem_open(SEM_R, O_CREAT, 0666, 0);

if (pipe(fd) < 0) { // 无名管道创建,成功返回 0,失败返回 -1
printf("pipe error\n");
exit(1);
}
for (i = 0; i < child_num; i++) {
pid = fork();
sid++;
if (pid < 0) {
printf("fork error\n");
destroy_sem();
exit(0);
} else if (pid == 0) { // child process 跳出 for 循环
break;
}
}
if (pid == 0) { // only children process could reach here
P(write_psx);
write_to_pipe(fd, sid);

V(read_psx);
V(write_psx);
exit(0);

} else { // only father process could reach here
signal(SIGINT, catch_INT);

while (1)
{
P(read_psx);
P(read_psx);
P(read_psx);

read_from_pipe(fd);
}
}
return 0;
}

void write_to_pipe(int fd[2], int sid)
{
int value;
char buf[10];
close(fd[0]);
memset(buf, sid + '0', sizeof(char)*10);
printf("[*] Children process %d write %d bytes data\n", sid, (int)sizeof(buf));
write(fd[1], buf, sizeof(buf)); // 若成功则返回写入的字节数,若出错则返回-1
}

int read_from_pipe(int fd[2])
{
int ret;
char buf[1024];
close(fd[1]);
memset(buf, '\0', sizeof(char)*1024);
ret = read(fd[0], buf, 1024); // 若成功则返回读到的字节数,若已到文件末尾则返回0,若出错则返回-1
while (ret > 0) {
printf("[*] Father process read %d bytes data: %s\n", ret, buf);
memset(buf, '\0', 1024);
ret = read(fd[0], buf, 1024);
}
return ret;
}

void P(sem_t *sem_ptr)
{
sem_wait(sem_ptr); // 信号量 sem<=0 则阻塞当前线程,sem>0 解除阻塞后将sem的值减一,表明公共资源经使用后减少
}

void V(sem_t *sem_ptr)
{
sem_post(sem_ptr); // 增加信号量的值,有线程阻塞信号量时,会使其中一个线程不再阻塞
}

void catch_INT(int sig)
{
printf("[*] Catch ctrl_c and exit\n");
destroy_sem();
exit(0);
}

void destroy_sem()
{
sem_unlink(SEM_W); // 从系统中删除有名信号量
sem_unlink(SEM_R);
}
1
$ gcc pipe.c -pthread -o pipe

管道默认大小

1
$ vim pipe_max.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// /home/user/os_3/pipe_com/pipe_max.c
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

void write_to_pipe(int fd[2]);

int main(void)
{
int fd[2];
pid_t pid;
int ret = -1;

if (pipe(fd) < 0) {
printf("[*] Error: pipe() return error\n");
exit(1);
}

pid = fork();
if (pid < 0) {
printf("[*] Error: fork() return error\n");
exit(1);
} else if (pid == 0) {
write_to_pipe(fd);
} else {
wait(NULL);
}

}

void write_to_pipe(int fd[2])
{
int count, ret;
char buffer[1024];
close(fd[0]);
memset(buffer, '*', 1024);

ret = write(fd[1], buffer, sizeof(buffer));
count = 1;
printf("[*] Pipe could write: %d bytes data\n", count * 1024);
while (1) {
ret = write(fd[1], buffer, sizeof(buffer)); // 管道被写满,则阻塞当前进程,等待数据被取走
if(ret == -1){
break; // 阻塞状态下无法自动跳出循环,可自行添加捕获 Ctrl_c 后跳出的代码
}
count++;
printf("[*] Pipe could write: %d bytes data\n", count * 1024);
}
}

实验效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ cd /home/dry/os_3/pipe_com
$ ./pipe # 测试父子进程管道通信
[*] Children process 1 write 10 bytes data
[*] Children process 2 write 10 bytes data
[*] Children process 3 write 10 bytes data
[*] Father process read 30 bytes data: 111111111122222222223333333333
^C
[*] Catch ctrl_c and exit
$ ./pipe # 多次测试,体验并发进程的不可预知性
[*] Children process 1 write 10 bytes data
[*] Children process 2 write 10 bytes data
[*] Children process 3 write 10 bytes data
[*] Father process read 30 bytes data: 222222222211111111113333333333
^C
[*] Catch ctrl_c and exit
$ ./pipe_max # 测试管道容量
[*] Pipe could write: 1024 bytes data
[*] Pipe could write: 2048 bytes data
... ... ...
[*] Pipe could write: 63488 bytes data
[*] Pipe could write: 64512 bytes data
[*] Pipe could write: 65536 bytes data
^C

实验详解

Question 1

如何保证子进程全部发完消息后,父进程再接收?
pipe_max.c 中,子进程在 Line 51read_psx 执行了三次 P原语操作 。而父进程在 Line 60 ~ 62read_psx 执行了三次 V原语操作 后才从管道中读取数据。当然,这些的前提是 read_psx 初值为 1
那么,令 read_psx 初值为 3 ,对应代码将如何修改?请自行尝试。

Question 2

为何在测试管道默认大小过程中在终端不断输出,而不是在数据填满管道时输出真正默认大小?
管道被写满时,阻塞当前进程。此为前提,所以笔者的想法是,被阻塞时的数据也就是最后一行数据即为管道默认大小。
当然,还有更好的写法。再向管道写数据的过程中不必输出,只是不断修改 max_size 的值。管道写满进程中断时手动跳过,之后输出 max_size 的值。感兴趣的自行尝试。

消息队列通信 NEW

实验要求

利用 linux 的消息队列通信机制实现两个线程间的通信:
编写程序创建三个线程:sender1 线程、sender2 线程和 receive 线程,三个线程的功能描述如下:
① sender1 线程:运行函数 sender1(),它创建一个消息队列,然后,等待用户通过终端输入一串字符,将这串字符通过消息队列发送给 receiver 线程;可循环发送多个消息,直到用户输入 exit 为止,表示它不再发消息,最后向 receiver 线程发送消息 end1 ,并且等待 receiver 的应答,等到应答消息后,将接收到的应答信息显示在终端屏幕上,结束程序的运行。
② sender2 线程:运行函数 sender2(),它创建一个消息队列,然后,等待用户通过终端输入一串字符,将这串字符通过消息队列发送给 receiver 线程;可循环发送多个消息,直到用户输入 exit 为止,表示它不再发消息,最后向 receiver 线程发送消息 end2 ,并且等待 receiver 的应答,等到应答消息后,将接收到的应答信息显示在终端屏幕上,结束程序的运行。
③ receiver 线程运行 receive(),它通过消息队列接收来自 sender1 和 sender2 两个线程的消息,将消息显示在终端屏幕上,当收到内容为 end1 的消息时,就向 sender1 发送一个应答消息 over1 ;当收到内容为 end2 的消息时,就向 sender2 发送一个应答消息 over2 ;消息收完后删除消息队列。使用合适的信号量机制实现三个线程之间的同步与互斥。

具体操作

1
2
$ cd /home/user/os_3/msg_queue
$ vim msg_queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// /home/user/os_3/msg_queue/msg_queue.c
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/types.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define send_type 1 // 2种 消息类型
#define recv_type 2

#define send_1_to_recv 1 // 4种 消息走向
#define send_2_to_recv 2
#define recv_to_send_1 3
#define recv_to_send_2 4

#define bool int
#define false 0
#define true 1


void *send_thread_1(void *arg);
void *send_thread_2(void *arg);
void *recv_thread(void *arg);
void P(sem_t *sem_ptr);
void V(sem_t *sem_ptr);

sem_t send_psx, recv_psx, final_recv_1, final_recv_2; // 定义4个 信号量类型
pthread_t send_pid_1, send_pid_2, recv_pid; // pthread_t 实际为 unsigned long int
int count = 1; // 计数器,声明为全局
bool send_1_over = false; // sender1线程 是否已经结束
bool send_2_over = false; // sender2线程 是否已经结束



struct msgbuf
{
long mtype;
char mtext[256];
int mw2w; // message where to where 表示消息的走向
};
int msgid;

int main(void)
{
sem_init(&send_psx, 0, 1); // pshared = 0,同一进程中多线程的同步
sem_init(&recv_psx, 0, 0);
sem_init(&final_recv_1, 0, 0);
sem_init(&final_recv_2, 0, 0);

msgid = msgget(IPC_PRIVATE, 0666|IPC_CREAT); // 创建消息队列
if (msgid < 0) {
printf("[*] Error: msgget() return error\n");
exit(1);
}
pthread_create(&send_pid_1, NULL, send_thread_1, NULL); // 创建 线程,并将线程加入当前 进程
pthread_create(&send_pid_2, NULL, send_thread_2, NULL);
pthread_create(&recv_pid, NULL, recv_thread, NULL);

pthread_join(send_pid_1, NULL); // 阻塞调用 send / receive 线程,功能相反于设置守护线程的 SetDaemon()
pthread_join(send_pid_2, NULL);
pthread_join(recv_pid, NULL);

return 0;
}

void *send_thread_1(void *arg)
{
char info[256]; // 消息发送区
struct msgbuf s_msg; // 消息缓存区
s_msg.mtype = send_type;
s_msg.mw2w = send_1_to_recv;
while (1) {
P(&send_psx);

printf("[%d]\n", count);
printf("[*] Send_thread_1 send: ");
scanf("%s", info);

if ((strcmp(info, "exit") == 0) || (strcmp(info, "end1") == 0)) {
strcpy(s_msg.mtext, "end1");
msgsnd(msgid, &s_msg, sizeof(struct msgbuf), 0);
V(&recv_psx);
break;
}
strcpy(s_msg.mtext, info);
count++;
msgsnd(msgid, &s_msg, sizeof(struct msgbuf), 0);// 追加一条消息到消息队列中
V(&recv_psx);

}
P(&final_recv_1); // final_recv_1 处理 send_thread_1 最后一次接受消息的问题

msgrcv(msgid, &s_msg, sizeof(struct msgbuf), recv_type, 0); // 从消息队列中读一条消息
printf("[*] Send_thread_1 receive: %s\n", s_msg.mtext);
count++;

V(&send_psx);

if (send_1_over && send_2_over){ // 2个 sender线程 都发送过 'end' 且收到过 'over' 后,将移除消息队列
msgctl(msgid, IPC_RMID, 0); // 移除消息队列
}
pthread_exit(NULL); // 类比进程的终止 exit()
}

void *send_thread_2(void *arg)
{
char info[256]; // 消息发送区
struct msgbuf s_msg; // 消息缓存区
s_msg.mtype = send_type;
s_msg.mw2w = send_2_to_recv;
while (1) {
P(&send_psx);

printf("[%d]\n", count);
printf("[*] Send_thread_2 send: ");
scanf("%s", info);

if ((strcmp(info, "exit") == 0) || (strcmp(info, "end2") == 0)) {
strcpy(s_msg.mtext, "end2");
msgsnd(msgid, &s_msg, sizeof(struct msgbuf), 0);
V(&recv_psx);
break;
}
strcpy(s_msg.mtext, info);
count++;
msgsnd(msgid, &s_msg, sizeof(struct msgbuf), 0);// 追加一条消息到消息队列中
V(&recv_psx);

}
P(&final_recv_2); // final_recv_2 处理 send_thread_2 最后一次接受消息的问题

count++;
msgrcv(msgid, &s_msg, sizeof(struct msgbuf), recv_type, 0); // 从消息队列中读一条消息
printf("[*] Send_thread_2 receive: %s\n", s_msg.mtext);

V(&send_psx);

if (send_1_over && send_2_over){ // 2个 sender 线程 都发送过 'end' 且收到过 'over' 后,将移除消息队列
msgctl(msgid, IPC_RMID, 0); // 移除消息队列
}
pthread_exit(NULL); // 类比进程的终止 exit()
}

void *recv_thread(void *arg)
{
struct msgbuf r_msg; // 消息缓存区
while (1) {
P(&recv_psx);
msgrcv(msgid, &r_msg, sizeof(struct msgbuf), send_type, 0);
if (r_msg.mw2w == send_1_to_recv){ // 根据 消息走向 判断来源
if (strcmp(r_msg.mtext, "end1") == 0) {
strcpy(r_msg.mtext, "over1");
r_msg.mtype = recv_type;
r_msg.mw2w = recv_to_send_1;
msgsnd(msgid, &r_msg, sizeof(struct msgbuf), 0);
printf("[*] Recv_thread receive 'end1' from Send_thread_1, return 'over1'\n");

V(&final_recv_1);
send_1_over = true;
}
else {
printf("[*] Recv_thread receive: %s from Send_thread_1\n", r_msg.mtext);
V(&send_psx);
}
}
else if (r_msg.mw2w == send_2_to_recv) { // 根据 消息走向 判断来源
if (strcmp(r_msg.mtext, "end2") == 0) {
strcpy(r_msg.mtext, "over2");
r_msg.mtype = recv_type;
r_msg.mw2w = recv_to_send_2;
msgsnd(msgid, &r_msg, sizeof(struct msgbuf), 0);
printf("[*] Recv_thread receive 'end2' from Send_thread_2, return 'over2'\n");

V(&final_recv_2);
send_2_over = true;

}
else {
printf("[*] Recv_thread receive: %s from Send_thread_2\n", r_msg.mtext);
V(&send_psx);
}
}


if (send_1_over && send_2_over){ // 2个 sender线程 都发送过 'end' 且收到过 'over' 后,将跳出循环,结束当前线程
break;
}
}
pthread_exit(NULL);
}

void P(sem_t *sem_ptr)
{
sem_wait(sem_ptr);
}

void V(sem_t *sem_ptr)
{
sem_post(sem_ptr);
}
1
$ gcc msg_queue.c -pthread -o msg_queue

实验效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ ./msg_queue
[1]
[*] Send_thread_1 send: 111
[*] Recv_thread receive: 111 from Send_thread_1
[2]
[*] Send_thread_2 send: 222
[*] Recv_thread receive: 222 from Send_thread_2
[3]
[*] Send_thread_1 send: hello
[*] Recv_thread receive: hello from Send_thread_1
[4]
[*] Send_thread_2 send: world
[*] Recv_thread receive: world from Send_thread_2
[5]
[*] Send_thread_1 send: exit
[*] Recv_thread receive 'end1' from Send_thread_1, return 'over1'
[*] Send_thread_1 receive: over1
[6]
[*] Send_thread_2 send: bye
[*] Recv_thread receive: bye from Send_thread_2
[7]
[*] Send_thread_2 send: exit
[*] Recv_thread receive 'end2' from Send_thread_2, return 'over2'
[*] Send_thread_2 receive: over2

实验详解

此处不再赘述,具体请参见 NEW 的注释 以及 OLD 的详解

消息队列通信 OLD

实验要求

利用 linux 的消息队列通信机制实现两个线程间的通信:
编写程序创建两个线程:sender 线程和 receive 线程,其中 sender 线程运行函数 sender(),它创建一个消息队列,然后,循环等待用户通过终端输入一串字符,将这串字符通过消息队列发送给 receiver 线程,直到用户输入 exit 为止;最后,它向 receiver 线程发送消息 end ,并且等待 receiver 的应答,等到应答消息后,将接收到的应答信息显示在终端屏幕上,删除相关消息队列,结束程序的运行。receiver 线程运行 receive(),它通过消息队列接收来自 sender 的消息,将消息显示在终端屏幕上,直至收到内容为 end 的消息为止,此时,它向 sender 发送一个应答消息 over ,结束程序的运行。使用 无名信号量 实现两个线程之间的同步与互斥。

具体操作

1
2
$ cd /home/user/os_3/msg_queue
$ vim msg_queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// /home/user/os_3/msg_queue/msg_queue.c
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/types.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/msg.h>

void *send_thread(void *arg);
void *recv_thread(void *arg);
void P(sem_t *sem_ptr);
void V(sem_t *sem_ptr);

sem_t send_psx, recv_psx, final_recv;
pthread_t send_pid, recv_pid; // 实际为unsigned long int

struct msgbuf
{
long mtype;
char mtext[256];
};
int msgid;

int main(void)
{
sem_init(&send_psx, 0, 1); // pshared = 0,同一进程中多线程的同步
sem_init(&recv_psx, 0, 0);
sem_init(&final_recv, 0, 0);
msgid = msgget(IPC_PRIVATE, 0666|IPC_CREAT); // 创建消息队列
if (msgid < 0) {
printf("[*] Error: msgget() return error\n");
exit(1);
}
pthread_create(&send_pid, NULL, send_thread, NULL); // 创建 线程,并将线程加入当前 进程
pthread_create(&recv_pid, NULL, recv_thread, NULL);

pthread_join(send_pid, NULL); // 阻塞调用 send / receive 线程,相反于设置守护线程的 SetDaemon()
pthread_join(recv_pid, NULL);

return 0;
}

void *send_thread(void *arg)
{
int count = 1;
char info[256]; // 消息接收区
struct msgbuf s_msg; // 消息缓存区
s_msg.mtype = 1;
while (1) {
P(&send_psx);
printf("[%d]\n", count);
printf("[*] Send_thread send: ");
scanf("%s", info);
if ((strcmp(info, "exit") == 0) || (strcmp(info, "end") == 0)) {
strcpy(s_msg.mtext, "end");
msgsnd(msgid, &s_msg, sizeof(struct msgbuf), 0);
V(&recv_psx);
break;
}
strcpy(s_msg.mtext, info);
count++;
msgsnd(msgid, &s_msg, sizeof(struct msgbuf), 0);// 追加一条消息到消息队列中

//V(&send_psx);
V(&recv_psx); // 保证了阻塞接受
}
P(&final_recv);
msgrcv(msgid, &s_msg, sizeof(struct msgbuf), 2, 0); // 从消息队列中读一条消息
printf("[*] Send_thread receive: %s\n", s_msg.mtext);
msgctl(msgid, IPC_RMID, 0); // 移出消息队列
pthread_exit(NULL); // 类比进程的终止 exit()
}

void *recv_thread(void *arg)
{
struct msgbuf r_msg; // 消息缓存区
r_msg.mtype = 1;
while (1) {
P(&recv_psx);
msgrcv(msgid, &r_msg, sizeof(struct msgbuf), 1, 0);
if (strcmp(r_msg.mtext, "end") == 0) {
strcpy(r_msg.mtext, "over");
r_msg.mtype = 2;
msgsnd(msgid, &r_msg, sizeof(struct msgbuf), 0);
printf("[*] Recv_thread receive 'end', return 'over'\n");

V(&final_recv);
break;
}
printf("[*] Recv_thread receive: %s\n", r_msg.mtext);
V(&send_psx); // 保证了阻塞发送

}
pthread_exit(NULL);
}

void P(sem_t *sem_ptr)
{
sem_wait(sem_ptr);
}

void V(sem_t *sem_ptr)
{
sem_post(sem_ptr);
}
1
$ gcc msg_queue.c -pthread -o msg_queue

实验效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ ./msg_queue
[1]
[*] Send_thread send: Hello,world!
[*] Recv_thread receive: ello,world!
[2]
[*] Send_thread send: Hello,Moto!
[*] Recv_thread receive: Hello,Moto!
[3]
[*] Send_thread send: Hello,Kitty!
[*] Recv_thread receive: Hello,Kitty!
[4]
[*] Send_thread send: exit
[*] Recv_thread receive 'end', return 'over'
[*] Send_thread receive: over

实验详解

Question 1

为何代码中采用 [阻塞发送-阻塞接收] ?
生活中大多数通信为全双工,即两方均可收发消息,也就是听别人发言的时候可以插嘴。常用的通信方式还有半双工,即两方均可收发消息,但不能都发消息,也就是同时只能有一方说话,对方不许插嘴。
本实验的通信采用单工,即一方只发消息,一方只收消息。且 [阻塞发送-阻塞接收] ,就是甲说完话,乙再听话,乙听完话,甲再说话。
那么回归问题本身,为何采用这种通信方式呢?为了在终端下 好看! 你当然可以改成 [无阻塞发送-阻塞接收],但我保证你不会喜欢那个效果的。

共享内存通信

实验要求

利用 linux 的共享内存通信机制实现两个进程间的通信:
编写程序 sender,它创建一个共享内存,然后等待用户通过终端输入一串字符,并将这串字符通过共享内存发送给 receiver;最后,它等待 receiver 的应答,等到应答消息后,将接收到的应答信息显示在终端屏幕上,删除共享内存,结束程序的运行。编写 receiver 程序,它通过共享内存接收来自 sender 的消息,将消息显示在终端屏幕上,然后再通过该共享内存向 sender 发送一个应答消息 over ,结束程序的运行。使用 有名信号量System V 信号量实现两个进程对共享内存的互斥及同步使用。

具体操作

1
2
$ cd /home/user/os_3/shared_mem
$ vim init.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// /home/user/os_3/shared_mem/init.h
#ifndef _INIT_H_
#define _INIT_H_

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/types.h>
#include <unistd.h>
#include <signal.h>
#include <sys/ipc.h>
#include <sys/shm.h>

#define SHM_SIZE 1024
#define KEY_NUM 1111

#define SEM_SEND "sem_send"
#define SEM_RECV "sem_recv"
#define SEM_FIN "sem_final"

void P(sem_t *semp);
void V(sem_t *semp);

#endif
1
$ vim init.c
1
2
3
4
5
6
7
8
9
10
11
// /home/user/os_3/shared_mem/init.c
#include "init.h"

void P(sem_t *semp)
{
sem_wait(semp);
}
void V(sem_t *semp)
{
sem_post(semp);
}
1
$ vim sender.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// /home/user/os_3/shared_mem/sender.c
#include "init.h"

key_t key;
int shmid;
void *shmp;

sem_t *send_psx;
sem_t *recv_psx;
sem_t *final_psx;

void init();
void destroy();
void catch_INT(int sig);
int check_value(sem_t *semtmp, int style);
int main(void)
{
int ret = -1;
char input[SHM_SIZE];
char info[SHM_SIZE];
signal(SIGINT, catch_INT);
init();

memset(input, '\0', sizeof(input));
// send msg to receiver
while (1) {
printf("[*] Send message: ");
scanf("%s", input);

P(send_psx);
strcpy(info, (char *)shmp);
strcat(info, input);
strcpy((char *)shmp, info);
if (strcmp(input, "exit") == 0) {
ret = check_value(recv_psx, 0); // check recv_psx value,if can release,return 1
if (ret == 1) {
V(recv_psx);
}
break;
}
memset(input, '\0', sizeof(input));
ret = check_value(recv_psx, 0);
if (ret == 1) {
V(recv_psx);
}
V(send_psx);
}

// recv end signal
P(final_psx);
strcpy(input, (char *)shmp);
printf("[*] Recv messsge: %s\n", input);
destroy();
printf("[*] Sender end\n");
return 0;
}

void init()
{
key = KEY_NUM;
shmid = shmget(key, SHM_SIZE, 0666|IPC_CREAT); // 创建 share memory,成功则返回id (一个与key相关的标识符)
if (shmid < 0) {
printf("{*] Error: shmget() return error\n");
exit(1);
}
shmp = shmat(shmid, NULL, 0); // shared memory attach,连接到当前进程地址空间,返回指向 share memory 的指针
memset((char *)shmp, '\0', SHM_SIZE);
send_psx = sem_open(SEM_SEND, O_CREAT, 0666, 1); // 有名信号量,适用于不同进程间的同步互斥
recv_psx = sem_open(SEM_RECV, O_CREAT, 0666, 0);
final_psx = sem_open(SEM_FIN, O_CREAT, 0666, 0);
}
void destroy()
{
shmdt(shmp); // share memory detach attach 断开共享内存与当前进程地址空间的连接
shmctl(shmid, IPC_RMID, NULL); // 撤销 share memory
sem_unlink(SEM_SEND);
sem_unlink(SEM_RECV);
sem_unlink(SEM_FIN);
}
int check_value(sem_t *semtmp, int style)
{
// return 0: can not release this sem
// return 1: you should release it
// style means we check value with different ways
int ret = -1, sem_vl;
ret = sem_getvalue(semtmp, &sem_vl);
if (ret == -1) {
printf("[*] Error:get sem value error\n");
destroy();
exit(0);
}

if (sem_vl == style)
return 1;
else
return 0;

}

void catch_INT(int sig)
{
printf("[*] Catch SIGINT\n");
destroy();
exit(0);
}
1
$ vim receiver.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// /home/user/os_3/shared_mem/receiver.c
#include "init.h"

key_t key;
int shmid;
void *shmp;

sem_t *send_psx; // 信号量结构类型
sem_t *recv_psx;
sem_t *final_psx;

void init();
void close_all();
void catch_INT(int sig);

int main(void)
{
char input[SHM_SIZE];
signal(SIGINT, catch_INT);
init();
// recv msg from sender
while (1) {
P(recv_psx);
strcpy(input, (char *)shmp);
memset((char *)shmp, '\0', SHM_SIZE);
printf("[*] Recv message: %s\n", input);
if (strcmp(input, "exit") == 0) {
strcpy((char *)shmp, "over");
printf("[*] Send message: over\n");
V(final_psx);
break;
}
}
close_all();
printf("[*] Receiver end\n");
return 0;
}

void init()
{
key = KEY_NUM;
shmid = shmget(key, SHM_SIZE, 0666|IPC_CREAT);
if (shmid < 0) {
printf("[*] Error: shmget() return error\n");
exit(1);
}
shmp = shmat(shmid, NULL, 0);
send_psx = sem_open(SEM_SEND, O_CREAT, 0666, 1);
recv_psx = sem_open(SEM_RECV, O_CREAT, 0666, 0);
final_psx = sem_open(SEM_FIN, O_CREAT, 0666, 0);
}

void close_all()
{
sem_close(send_psx);
sem_close(recv_psx);
sem_close(final_psx);
shmdt(shmp);
}

void catch_INT(int sig)
{
printf("[*] Catch SIGINT\n");
close_all();
exit(0);
}
1
$ vim Makefile
1
2
3
4
5
all: sender receiver
sender: sender.c init.c
gcc sender.c init.c -pthread -o sender
receiver: receiver.c init.c
gcc receiver.c init.c -pthread -o receiver

[*] Caution! gcc 前是 TAB 而非多个 ,错误缩进编译时会导致中间代码 .o 文件生成失败

1
2
3
4
5
$ make
gcc sender.c init.c -pthread -o sender
gcc receiver.c init.c -pthread -o receiver
$ ls
init.c init.h Makefile receiver receiver.c sender sender.c

实验效果

1
2
3
4
5
6
7
user@user-linux:~/os_3/shared_mem$ ./sender         │user@user-linux:~/os_3/shared_mem$ ./receiver
[*] Send message: 123 │[*] Recv message: 123
[*] Send message: 456 │[*] Recv message: 456
[*] Send message: 789 │[*] Recv message: 789
[*] Send message: exit │[*] Recv message: exit
[*] Recv messsge: over │[*] Send message: over
[*] Sender end │[*] Receiver end

实验详解

Question 1

函数check_value()的必要性?
check_value(sem_t *semtmp, int style) 可以判断是否有 线程/进程 阻塞在 sem_wait(semtmp) 上,即 check_value(recv_psx,0) 可以判断是否有线程/进程在 读取 ,若没有,才对 recv_psx 进行 V原语操作 ,释放 读取 的资源,允许 读取 。保证了同时只能有一个线程/进程在 读取

附录 NEW

新版实验整体效果如下,请在新标签页打开图片(I)

附录 OLD

老版实验整体效果如下,请在新标签页打开图片(I)