Print this page
5042 stop using deprecated atomic functions
Split |
Close |
Expand all |
Collapse all |
--- old/usr/src/uts/common/io/stream.c
+++ new/usr/src/uts/common/io/stream.c
1 1 /*
2 2 * CDDL HEADER START
3 3 *
4 4 * The contents of this file are subject to the terms of the
5 5 * Common Development and Distribution License (the "License").
6 6 * You may not use this file except in compliance with the License.
7 7 *
8 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 9 * or http://www.opensolaris.org/os/licensing.
10 10 * See the License for the specific language governing permissions
11 11 * and limitations under the License.
12 12 *
13 13 * When distributing Covered Code, include this CDDL HEADER in each
14 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 15 * If applicable, add the following below this CDDL HEADER, with the
16 16 * fields enclosed by brackets "[]" replaced with your own identifying
17 17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 18 *
19 19 * CDDL HEADER END
20 20 */
21 21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
22 22 /* All Rights Reserved */
23 23
24 24 /*
25 25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
26 26 * Use is subject to license terms.
27 27 */
28 28
29 29 #include <sys/types.h>
30 30 #include <sys/param.h>
31 31 #include <sys/thread.h>
32 32 #include <sys/sysmacros.h>
33 33 #include <sys/stropts.h>
34 34 #include <sys/stream.h>
35 35 #include <sys/strsubr.h>
36 36 #include <sys/strsun.h>
37 37 #include <sys/conf.h>
38 38 #include <sys/debug.h>
39 39 #include <sys/cmn_err.h>
40 40 #include <sys/kmem.h>
41 41 #include <sys/atomic.h>
42 42 #include <sys/errno.h>
43 43 #include <sys/vtrace.h>
44 44 #include <sys/ftrace.h>
45 45 #include <sys/ontrap.h>
46 46 #include <sys/multidata.h>
47 47 #include <sys/multidata_impl.h>
48 48 #include <sys/sdt.h>
49 49 #include <sys/strft.h>
50 50
51 51 #ifdef DEBUG
52 52 #include <sys/kmem_impl.h>
53 53 #endif
54 54
55 55 /*
56 56 * This file contains all the STREAMS utility routines that may
57 57 * be used by modules and drivers.
58 58 */
59 59
60 60 /*
61 61 * STREAMS message allocator: principles of operation
62 62 *
63 63 * The streams message allocator consists of all the routines that
64 64 * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
65 65 * dupb(), freeb() and freemsg(). What follows is a high-level view
66 66 * of how the allocator works.
67 67 *
68 68 * Every streams message consists of one or more mblks, a dblk, and data.
69 69 * All mblks for all types of messages come from a common mblk_cache.
70 70 * The dblk and data come in several flavors, depending on how the
71 71 * message is allocated:
72 72 *
73 73 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
74 74 * fixed-size dblk/data caches. For message sizes that are multiples of
75 75 * PAGESIZE, dblks are allocated separately from the buffer.
76 76 * The associated buffer is allocated by the constructor using kmem_alloc().
77 77 * For all other message sizes, dblk and its associated data is allocated
78 78 * as a single contiguous chunk of memory.
79 79 * Objects in these caches consist of a dblk plus its associated data.
80 80 * allocb() determines the nearest-size cache by table lookup:
81 81 * the dblk_cache[] array provides the mapping from size to dblk cache.
82 82 *
83 83 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
84 84 * kmem_alloc()'ing a buffer for the data and supplying that
85 85 * buffer to gesballoc(), described below.
86 86 *
87 87 * (3) The four flavors of [d]esballoc[a] are all implemented by a
88 88 * common routine, gesballoc() ("generic esballoc"). gesballoc()
89 89 * allocates a dblk from the global dblk_esb_cache and sets db_base,
90 90 * db_lim and db_frtnp to describe the caller-supplied buffer.
91 91 *
92 92 * While there are several routines to allocate messages, there is only
93 93 * one routine to free messages: freeb(). freeb() simply invokes the
94 94 * dblk's free method, dbp->db_free(), which is set at allocation time.
95 95 *
96 96 * dupb() creates a new reference to a message by allocating a new mblk,
97 97 * incrementing the dblk reference count and setting the dblk's free
98 98 * method to dblk_decref(). The dblk's original free method is retained
99 99 * in db_lastfree. dblk_decref() decrements the reference count on each
100 100 * freeb(). If this is not the last reference it just frees the mblk;
101 101 * if this *is* the last reference, it restores db_free to db_lastfree,
102 102 * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
103 103 *
104 104 * The implementation makes aggressive use of kmem object caching for
105 105 * maximum performance. This makes the code simple and compact, but
106 106 * also a bit abstruse in some places. The invariants that constitute a
107 107 * message's constructed state, described below, are more subtle than usual.
108 108 *
109 109 * Every dblk has an "attached mblk" as part of its constructed state.
110 110 * The mblk is allocated by the dblk's constructor and remains attached
111 111 * until the message is either dup'ed or pulled up. In the dupb() case
112 112 * the mblk association doesn't matter until the last free, at which time
113 113 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects
114 114 * the mblk association because it swaps the leading mblks of two messages,
115 115 * so it is responsible for swapping their db_mblk pointers accordingly.
116 116 * From a constructed-state viewpoint it doesn't matter that a dblk's
117 117 * attached mblk can change while the message is allocated; all that
118 118 * matters is that the dblk has *some* attached mblk when it's freed.
119 119 *
120 120 * The sizes of the allocb() small-message caches are not magical.
121 121 * They represent a good trade-off between internal and external
122 122 * fragmentation for current workloads. They should be reevaluated
123 123 * periodically, especially if allocations larger than DBLK_MAX_CACHE
124 124 * become common. We use 64-byte alignment so that dblks don't
125 125 * straddle cache lines unnecessarily.
126 126 */
127 127 #define DBLK_MAX_CACHE 73728
128 128 #define DBLK_CACHE_ALIGN 64
129 129 #define DBLK_MIN_SIZE 8
130 130 #define DBLK_SIZE_SHIFT 3
131 131
132 132 #ifdef _BIG_ENDIAN
133 133 #define DBLK_RTFU_SHIFT(field) \
134 134 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
135 135 #else
136 136 #define DBLK_RTFU_SHIFT(field) \
137 137 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
138 138 #endif
139 139
140 140 #define DBLK_RTFU(ref, type, flags, uioflag) \
141 141 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
142 142 ((type) << DBLK_RTFU_SHIFT(db_type)) | \
143 143 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
144 144 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
145 145 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
146 146 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref))
147 147 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band))
148 148
149 149 static size_t dblk_sizes[] = {
150 150 #ifdef _LP64
151 151 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
152 152 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
153 153 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
154 154 #else
155 155 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
156 156 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
157 157 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
158 158 #endif
159 159 DBLK_MAX_CACHE, 0
160 160 };
161 161
162 162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
163 163 static struct kmem_cache *mblk_cache;
164 164 static struct kmem_cache *dblk_esb_cache;
165 165 static struct kmem_cache *fthdr_cache;
166 166 static struct kmem_cache *ftblk_cache;
167 167
168 168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
169 169 static mblk_t *allocb_oversize(size_t size, int flags);
170 170 static int allocb_tryhard_fails;
171 171 static void frnop_func(void *arg);
172 172 frtn_t frnop = { frnop_func };
173 173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
174 174
175 175 static boolean_t rwnext_enter(queue_t *qp);
176 176 static void rwnext_exit(queue_t *qp);
177 177
178 178 /*
179 179 * Patchable mblk/dblk kmem_cache flags.
180 180 */
181 181 int dblk_kmem_flags = 0;
182 182 int mblk_kmem_flags = 0;
183 183
184 184 static int
185 185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
186 186 {
187 187 dblk_t *dbp = buf;
188 188 ssize_t msg_size = (ssize_t)cdrarg;
189 189 size_t index;
190 190
191 191 ASSERT(msg_size != 0);
192 192
193 193 index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
194 194
195 195 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
196 196
197 197 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
198 198 return (-1);
199 199 if ((msg_size & PAGEOFFSET) == 0) {
200 200 dbp->db_base = kmem_alloc(msg_size, kmflags);
201 201 if (dbp->db_base == NULL) {
202 202 kmem_cache_free(mblk_cache, dbp->db_mblk);
203 203 return (-1);
204 204 }
205 205 } else {
206 206 dbp->db_base = (unsigned char *)&dbp[1];
207 207 }
208 208
209 209 dbp->db_mblk->b_datap = dbp;
210 210 dbp->db_cache = dblk_cache[index];
211 211 dbp->db_lim = dbp->db_base + msg_size;
212 212 dbp->db_free = dbp->db_lastfree = dblk_lastfree;
213 213 dbp->db_frtnp = NULL;
214 214 dbp->db_fthdr = NULL;
215 215 dbp->db_credp = NULL;
216 216 dbp->db_cpid = -1;
217 217 dbp->db_struioflag = 0;
218 218 dbp->db_struioun.cksum.flags = 0;
219 219 return (0);
220 220 }
221 221
222 222 /*ARGSUSED*/
223 223 static int
224 224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
225 225 {
226 226 dblk_t *dbp = buf;
227 227
228 228 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
229 229 return (-1);
230 230 dbp->db_mblk->b_datap = dbp;
231 231 dbp->db_cache = dblk_esb_cache;
232 232 dbp->db_fthdr = NULL;
233 233 dbp->db_credp = NULL;
234 234 dbp->db_cpid = -1;
235 235 dbp->db_struioflag = 0;
236 236 dbp->db_struioun.cksum.flags = 0;
237 237 return (0);
238 238 }
239 239
240 240 static int
241 241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
242 242 {
243 243 dblk_t *dbp = buf;
244 244 bcache_t *bcp = cdrarg;
245 245
246 246 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
247 247 return (-1);
248 248
249 249 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
250 250 if (dbp->db_base == NULL) {
251 251 kmem_cache_free(mblk_cache, dbp->db_mblk);
252 252 return (-1);
253 253 }
254 254
255 255 dbp->db_mblk->b_datap = dbp;
256 256 dbp->db_cache = (void *)bcp;
257 257 dbp->db_lim = dbp->db_base + bcp->size;
258 258 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
259 259 dbp->db_frtnp = NULL;
260 260 dbp->db_fthdr = NULL;
261 261 dbp->db_credp = NULL;
262 262 dbp->db_cpid = -1;
263 263 dbp->db_struioflag = 0;
264 264 dbp->db_struioun.cksum.flags = 0;
265 265 return (0);
266 266 }
267 267
268 268 /*ARGSUSED*/
269 269 static void
270 270 dblk_destructor(void *buf, void *cdrarg)
271 271 {
272 272 dblk_t *dbp = buf;
273 273 ssize_t msg_size = (ssize_t)cdrarg;
274 274
275 275 ASSERT(dbp->db_mblk->b_datap == dbp);
276 276 ASSERT(msg_size != 0);
277 277 ASSERT(dbp->db_struioflag == 0);
278 278 ASSERT(dbp->db_struioun.cksum.flags == 0);
279 279
280 280 if ((msg_size & PAGEOFFSET) == 0) {
281 281 kmem_free(dbp->db_base, msg_size);
282 282 }
283 283
284 284 kmem_cache_free(mblk_cache, dbp->db_mblk);
285 285 }
286 286
287 287 static void
288 288 bcache_dblk_destructor(void *buf, void *cdrarg)
289 289 {
290 290 dblk_t *dbp = buf;
291 291 bcache_t *bcp = cdrarg;
292 292
293 293 kmem_cache_free(bcp->buffer_cache, dbp->db_base);
294 294
295 295 ASSERT(dbp->db_mblk->b_datap == dbp);
296 296 ASSERT(dbp->db_struioflag == 0);
297 297 ASSERT(dbp->db_struioun.cksum.flags == 0);
298 298
299 299 kmem_cache_free(mblk_cache, dbp->db_mblk);
300 300 }
301 301
302 302 /* ARGSUSED */
303 303 static int
304 304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
305 305 {
306 306 ftblk_t *fbp = buf;
307 307 int i;
308 308
309 309 bzero(fbp, sizeof (ftblk_t));
310 310 if (str_ftstack != 0) {
311 311 for (i = 0; i < FTBLK_EVNTS; i++)
312 312 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
313 313 }
314 314
315 315 return (0);
316 316 }
317 317
318 318 /* ARGSUSED */
319 319 static void
320 320 ftblk_destructor(void *buf, void *cdrarg)
321 321 {
322 322 ftblk_t *fbp = buf;
323 323 int i;
324 324
325 325 if (str_ftstack != 0) {
326 326 for (i = 0; i < FTBLK_EVNTS; i++) {
327 327 if (fbp->ev[i].stk != NULL) {
328 328 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
329 329 fbp->ev[i].stk = NULL;
330 330 }
331 331 }
332 332 }
333 333 }
334 334
335 335 static int
336 336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
337 337 {
338 338 fthdr_t *fhp = buf;
339 339
340 340 return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
341 341 }
342 342
343 343 static void
344 344 fthdr_destructor(void *buf, void *cdrarg)
345 345 {
346 346 fthdr_t *fhp = buf;
347 347
348 348 ftblk_destructor(&fhp->first, cdrarg);
349 349 }
350 350
351 351 void
352 352 streams_msg_init(void)
353 353 {
354 354 char name[40];
355 355 size_t size;
356 356 size_t lastsize = DBLK_MIN_SIZE;
357 357 size_t *sizep;
358 358 struct kmem_cache *cp;
359 359 size_t tot_size;
360 360 int offset;
361 361
362 362 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
363 363 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
364 364
365 365 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
366 366
367 367 if ((offset = (size & PAGEOFFSET)) != 0) {
368 368 /*
369 369 * We are in the middle of a page, dblk should
370 370 * be allocated on the same page
371 371 */
372 372 tot_size = size + sizeof (dblk_t);
373 373 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
374 374 < PAGESIZE);
375 375 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
376 376
377 377 } else {
378 378
379 379 /*
380 380 * buf size is multiple of page size, dblk and
381 381 * buffer are allocated separately.
382 382 */
383 383
384 384 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
385 385 tot_size = sizeof (dblk_t);
386 386 }
387 387
388 388 (void) sprintf(name, "streams_dblk_%ld", size);
389 389 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
390 390 dblk_constructor, dblk_destructor, NULL, (void *)(size),
391 391 NULL, dblk_kmem_flags);
392 392
393 393 while (lastsize <= size) {
394 394 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
395 395 lastsize += DBLK_MIN_SIZE;
396 396 }
397 397 }
398 398
399 399 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
400 400 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
401 401 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
402 402 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
403 403 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
404 404 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
405 405 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
406 406
407 407 /* Initialize Multidata caches */
408 408 mmd_init();
409 409
410 410 /* initialize throttling queue for esballoc */
411 411 esballoc_queue_init();
412 412 }
413 413
414 414 /*ARGSUSED*/
415 415 mblk_t *
416 416 allocb(size_t size, uint_t pri)
417 417 {
418 418 dblk_t *dbp;
419 419 mblk_t *mp;
420 420 size_t index;
421 421
422 422 index = (size - 1) >> DBLK_SIZE_SHIFT;
423 423
424 424 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
425 425 if (size != 0) {
426 426 mp = allocb_oversize(size, KM_NOSLEEP);
427 427 goto out;
428 428 }
429 429 index = 0;
430 430 }
431 431
432 432 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
433 433 mp = NULL;
434 434 goto out;
435 435 }
436 436
437 437 mp = dbp->db_mblk;
438 438 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
439 439 mp->b_next = mp->b_prev = mp->b_cont = NULL;
440 440 mp->b_rptr = mp->b_wptr = dbp->db_base;
441 441 mp->b_queue = NULL;
442 442 MBLK_BAND_FLAG_WORD(mp) = 0;
443 443 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
444 444 out:
445 445 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
446 446
447 447 return (mp);
448 448 }
449 449
450 450 /*
451 451 * Allocate an mblk taking db_credp and db_cpid from the template.
452 452 * Allow the cred to be NULL.
453 453 */
454 454 mblk_t *
455 455 allocb_tmpl(size_t size, const mblk_t *tmpl)
456 456 {
457 457 mblk_t *mp = allocb(size, 0);
458 458
459 459 if (mp != NULL) {
460 460 dblk_t *src = tmpl->b_datap;
461 461 dblk_t *dst = mp->b_datap;
462 462 cred_t *cr;
463 463 pid_t cpid;
464 464
465 465 cr = msg_getcred(tmpl, &cpid);
466 466 if (cr != NULL)
467 467 crhold(dst->db_credp = cr);
468 468 dst->db_cpid = cpid;
469 469 dst->db_type = src->db_type;
470 470 }
471 471 return (mp);
472 472 }
473 473
474 474 mblk_t *
475 475 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
476 476 {
477 477 mblk_t *mp = allocb(size, 0);
478 478
479 479 ASSERT(cr != NULL);
480 480 if (mp != NULL) {
481 481 dblk_t *dbp = mp->b_datap;
482 482
483 483 crhold(dbp->db_credp = cr);
484 484 dbp->db_cpid = cpid;
485 485 }
486 486 return (mp);
487 487 }
488 488
489 489 mblk_t *
490 490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
491 491 {
492 492 mblk_t *mp = allocb_wait(size, 0, flags, error);
493 493
494 494 ASSERT(cr != NULL);
495 495 if (mp != NULL) {
496 496 dblk_t *dbp = mp->b_datap;
497 497
498 498 crhold(dbp->db_credp = cr);
499 499 dbp->db_cpid = cpid;
500 500 }
501 501
502 502 return (mp);
503 503 }
504 504
505 505 /*
506 506 * Extract the db_cred (and optionally db_cpid) from a message.
507 507 * We find the first mblk which has a non-NULL db_cred and use that.
508 508 * If none found we return NULL.
509 509 * Does NOT get a hold on the cred.
510 510 */
511 511 cred_t *
512 512 msg_getcred(const mblk_t *mp, pid_t *cpidp)
513 513 {
514 514 cred_t *cr = NULL;
515 515 cred_t *cr2;
516 516 mblk_t *mp2;
517 517
518 518 while (mp != NULL) {
519 519 dblk_t *dbp = mp->b_datap;
520 520
521 521 cr = dbp->db_credp;
522 522 if (cr == NULL) {
523 523 mp = mp->b_cont;
524 524 continue;
525 525 }
526 526 if (cpidp != NULL)
527 527 *cpidp = dbp->db_cpid;
528 528
529 529 #ifdef DEBUG
530 530 /*
531 531 * Normally there should at most one db_credp in a message.
532 532 * But if there are multiple (as in the case of some M_IOC*
533 533 * and some internal messages in TCP/IP bind logic) then
534 534 * they must be identical in the normal case.
535 535 * However, a socket can be shared between different uids
536 536 * in which case data queued in TCP would be from different
537 537 * creds. Thus we can only assert for the zoneid being the
538 538 * same. Due to Multi-level Level Ports for TX, some
539 539 * cred_t can have a NULL cr_zone, and we skip the comparison
540 540 * in that case.
541 541 */
542 542 mp2 = mp->b_cont;
543 543 while (mp2 != NULL) {
544 544 cr2 = DB_CRED(mp2);
545 545 if (cr2 != NULL) {
546 546 DTRACE_PROBE2(msg__getcred,
547 547 cred_t *, cr, cred_t *, cr2);
548 548 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
549 549 crgetzone(cr) == NULL ||
550 550 crgetzone(cr2) == NULL);
551 551 }
552 552 mp2 = mp2->b_cont;
553 553 }
554 554 #endif
555 555 return (cr);
556 556 }
557 557 if (cpidp != NULL)
558 558 *cpidp = NOPID;
559 559 return (NULL);
560 560 }
561 561
562 562 /*
563 563 * Variant of msg_getcred which, when a cred is found
564 564 * 1. Returns with a hold on the cred
565 565 * 2. Clears the first cred in the mblk.
566 566 * This is more efficient to use than a msg_getcred() + crhold() when
567 567 * the message is freed after the cred has been extracted.
568 568 *
569 569 * The caller is responsible for ensuring that there is no other reference
570 570 * on the message since db_credp can not be cleared when there are other
571 571 * references.
572 572 */
573 573 cred_t *
574 574 msg_extractcred(mblk_t *mp, pid_t *cpidp)
575 575 {
576 576 cred_t *cr = NULL;
577 577 cred_t *cr2;
578 578 mblk_t *mp2;
579 579
580 580 while (mp != NULL) {
581 581 dblk_t *dbp = mp->b_datap;
582 582
583 583 cr = dbp->db_credp;
584 584 if (cr == NULL) {
585 585 mp = mp->b_cont;
586 586 continue;
587 587 }
588 588 ASSERT(dbp->db_ref == 1);
589 589 dbp->db_credp = NULL;
590 590 if (cpidp != NULL)
591 591 *cpidp = dbp->db_cpid;
592 592 #ifdef DEBUG
593 593 /*
594 594 * Normally there should at most one db_credp in a message.
595 595 * But if there are multiple (as in the case of some M_IOC*
596 596 * and some internal messages in TCP/IP bind logic) then
597 597 * they must be identical in the normal case.
598 598 * However, a socket can be shared between different uids
599 599 * in which case data queued in TCP would be from different
600 600 * creds. Thus we can only assert for the zoneid being the
601 601 * same. Due to Multi-level Level Ports for TX, some
602 602 * cred_t can have a NULL cr_zone, and we skip the comparison
603 603 * in that case.
604 604 */
605 605 mp2 = mp->b_cont;
606 606 while (mp2 != NULL) {
607 607 cr2 = DB_CRED(mp2);
608 608 if (cr2 != NULL) {
609 609 DTRACE_PROBE2(msg__extractcred,
610 610 cred_t *, cr, cred_t *, cr2);
611 611 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
612 612 crgetzone(cr) == NULL ||
613 613 crgetzone(cr2) == NULL);
614 614 }
615 615 mp2 = mp2->b_cont;
616 616 }
617 617 #endif
618 618 return (cr);
619 619 }
620 620 return (NULL);
621 621 }
622 622 /*
623 623 * Get the label for a message. Uses the first mblk in the message
624 624 * which has a non-NULL db_credp.
625 625 * Returns NULL if there is no credp.
626 626 */
627 627 extern struct ts_label_s *
628 628 msg_getlabel(const mblk_t *mp)
629 629 {
630 630 cred_t *cr = msg_getcred(mp, NULL);
631 631
632 632 if (cr == NULL)
633 633 return (NULL);
634 634
635 635 return (crgetlabel(cr));
636 636 }
637 637
638 638 void
639 639 freeb(mblk_t *mp)
640 640 {
641 641 dblk_t *dbp = mp->b_datap;
642 642
643 643 ASSERT(dbp->db_ref > 0);
644 644 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
645 645 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
646 646
647 647 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
648 648
649 649 dbp->db_free(mp, dbp);
650 650 }
651 651
652 652 void
653 653 freemsg(mblk_t *mp)
654 654 {
655 655 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
656 656 while (mp) {
657 657 dblk_t *dbp = mp->b_datap;
658 658 mblk_t *mp_cont = mp->b_cont;
659 659
660 660 ASSERT(dbp->db_ref > 0);
661 661 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
662 662
663 663 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
664 664
665 665 dbp->db_free(mp, dbp);
666 666 mp = mp_cont;
667 667 }
668 668 }
669 669
670 670 /*
671 671 * Reallocate a block for another use. Try hard to use the old block.
672 672 * If the old data is wanted (copy), leave b_wptr at the end of the data,
673 673 * otherwise return b_wptr = b_rptr.
674 674 *
675 675 * This routine is private and unstable.
676 676 */
677 677 mblk_t *
678 678 reallocb(mblk_t *mp, size_t size, uint_t copy)
679 679 {
680 680 mblk_t *mp1;
681 681 unsigned char *old_rptr;
682 682 ptrdiff_t cur_size;
683 683
684 684 if (mp == NULL)
685 685 return (allocb(size, BPRI_HI));
686 686
687 687 cur_size = mp->b_wptr - mp->b_rptr;
688 688 old_rptr = mp->b_rptr;
689 689
690 690 ASSERT(mp->b_datap->db_ref != 0);
691 691
692 692 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
693 693 /*
694 694 * If the data is wanted and it will fit where it is, no
695 695 * work is required.
696 696 */
697 697 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
698 698 return (mp);
699 699
700 700 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
701 701 mp1 = mp;
702 702 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
703 703 /* XXX other mp state could be copied too, db_flags ... ? */
704 704 mp1->b_cont = mp->b_cont;
705 705 } else {
706 706 return (NULL);
707 707 }
708 708
709 709 if (copy) {
710 710 bcopy(old_rptr, mp1->b_rptr, cur_size);
711 711 mp1->b_wptr = mp1->b_rptr + cur_size;
712 712 }
713 713
714 714 if (mp != mp1)
715 715 freeb(mp);
716 716
717 717 return (mp1);
718 718 }
719 719
720 720 static void
721 721 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
722 722 {
723 723 ASSERT(dbp->db_mblk == mp);
724 724 if (dbp->db_fthdr != NULL)
725 725 str_ftfree(dbp);
726 726
727 727 /* set credp and projid to be 'unspecified' before returning to cache */
728 728 if (dbp->db_credp != NULL) {
729 729 crfree(dbp->db_credp);
730 730 dbp->db_credp = NULL;
731 731 }
732 732 dbp->db_cpid = -1;
733 733
734 734 /* Reset the struioflag and the checksum flag fields */
735 735 dbp->db_struioflag = 0;
736 736 dbp->db_struioun.cksum.flags = 0;
737 737
738 738 /* and the COOKED and/or UIOA flag(s) */
739 739 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
740 740
741 741 kmem_cache_free(dbp->db_cache, dbp);
742 742 }
743 743
744 744 static void
745 745 dblk_decref(mblk_t *mp, dblk_t *dbp)
746 746 {
747 747 if (dbp->db_ref != 1) {
748 748 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
749 749 -(1 << DBLK_RTFU_SHIFT(db_ref)));
750 750 /*
751 751 * atomic_add_32_nv() just decremented db_ref, so we no longer
752 752 * have a reference to the dblk, which means another thread
753 753 * could free it. Therefore we cannot examine the dblk to
754 754 * determine whether ours was the last reference. Instead,
755 755 * we extract the new and minimum reference counts from rtfu.
756 756 * Note that all we're really saying is "if (ref != refmin)".
757 757 */
758 758 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
759 759 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
760 760 kmem_cache_free(mblk_cache, mp);
761 761 return;
762 762 }
763 763 }
764 764 dbp->db_mblk = mp;
765 765 dbp->db_free = dbp->db_lastfree;
766 766 dbp->db_lastfree(mp, dbp);
767 767 }
768 768
769 769 mblk_t *
770 770 dupb(mblk_t *mp)
771 771 {
772 772 dblk_t *dbp = mp->b_datap;
773 773 mblk_t *new_mp;
774 774 uint32_t oldrtfu, newrtfu;
775 775
776 776 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
777 777 goto out;
778 778
779 779 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
780 780 new_mp->b_rptr = mp->b_rptr;
781 781 new_mp->b_wptr = mp->b_wptr;
782 782 new_mp->b_datap = dbp;
783 783 new_mp->b_queue = NULL;
784 784 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
785 785
786 786 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
787 787
788 788 dbp->db_free = dblk_decref;
789 789 do {
790 790 ASSERT(dbp->db_ref > 0);
↓ open down ↓ |
790 lines elided |
↑ open up ↑ |
791 791 oldrtfu = DBLK_RTFU_WORD(dbp);
792 792 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
793 793 /*
794 794 * If db_ref is maxed out we can't dup this message anymore.
795 795 */
796 796 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
797 797 kmem_cache_free(mblk_cache, new_mp);
798 798 new_mp = NULL;
799 799 goto out;
800 800 }
801 - } while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);
801 + } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
802 + oldrtfu);
802 803
803 804 out:
804 805 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
805 806 return (new_mp);
806 807 }
807 808
808 809 static void
809 810 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
810 811 {
811 812 frtn_t *frp = dbp->db_frtnp;
812 813
813 814 ASSERT(dbp->db_mblk == mp);
814 815 frp->free_func(frp->free_arg);
815 816 if (dbp->db_fthdr != NULL)
816 817 str_ftfree(dbp);
817 818
818 819 /* set credp and projid to be 'unspecified' before returning to cache */
819 820 if (dbp->db_credp != NULL) {
820 821 crfree(dbp->db_credp);
821 822 dbp->db_credp = NULL;
822 823 }
823 824 dbp->db_cpid = -1;
824 825 dbp->db_struioflag = 0;
825 826 dbp->db_struioun.cksum.flags = 0;
826 827
827 828 kmem_cache_free(dbp->db_cache, dbp);
828 829 }
829 830
830 831 /*ARGSUSED*/
831 832 static void
832 833 frnop_func(void *arg)
833 834 {
834 835 }
835 836
836 837 /*
837 838 * Generic esballoc used to implement the four flavors: [d]esballoc[a].
838 839 */
839 840 static mblk_t *
840 841 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
841 842 void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
842 843 {
843 844 dblk_t *dbp;
844 845 mblk_t *mp;
845 846
846 847 ASSERT(base != NULL && frp != NULL);
847 848
848 849 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
849 850 mp = NULL;
850 851 goto out;
851 852 }
852 853
853 854 mp = dbp->db_mblk;
854 855 dbp->db_base = base;
855 856 dbp->db_lim = base + size;
856 857 dbp->db_free = dbp->db_lastfree = lastfree;
857 858 dbp->db_frtnp = frp;
858 859 DBLK_RTFU_WORD(dbp) = db_rtfu;
859 860 mp->b_next = mp->b_prev = mp->b_cont = NULL;
860 861 mp->b_rptr = mp->b_wptr = base;
861 862 mp->b_queue = NULL;
862 863 MBLK_BAND_FLAG_WORD(mp) = 0;
863 864
864 865 out:
865 866 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
866 867 return (mp);
867 868 }
868 869
869 870 /*ARGSUSED*/
870 871 mblk_t *
871 872 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
872 873 {
873 874 mblk_t *mp;
874 875
875 876 /*
876 877 * Note that this is structured to allow the common case (i.e.
877 878 * STREAMS flowtracing disabled) to call gesballoc() with tail
878 879 * call optimization.
879 880 */
880 881 if (!str_ftnever) {
881 882 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
882 883 frp, freebs_enqueue, KM_NOSLEEP);
883 884
884 885 if (mp != NULL)
885 886 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
886 887 return (mp);
887 888 }
888 889
889 890 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
890 891 frp, freebs_enqueue, KM_NOSLEEP));
891 892 }
892 893
893 894 /*
894 895 * Same as esballoc() but sleeps waiting for memory.
895 896 */
896 897 /*ARGSUSED*/
897 898 mblk_t *
898 899 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
899 900 {
900 901 mblk_t *mp;
901 902
902 903 /*
903 904 * Note that this is structured to allow the common case (i.e.
904 905 * STREAMS flowtracing disabled) to call gesballoc() with tail
905 906 * call optimization.
906 907 */
907 908 if (!str_ftnever) {
908 909 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
909 910 frp, freebs_enqueue, KM_SLEEP);
910 911
911 912 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
912 913 return (mp);
913 914 }
914 915
915 916 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
916 917 frp, freebs_enqueue, KM_SLEEP));
917 918 }
918 919
919 920 /*ARGSUSED*/
920 921 mblk_t *
921 922 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
922 923 {
923 924 mblk_t *mp;
924 925
925 926 /*
926 927 * Note that this is structured to allow the common case (i.e.
927 928 * STREAMS flowtracing disabled) to call gesballoc() with tail
928 929 * call optimization.
929 930 */
930 931 if (!str_ftnever) {
931 932 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
932 933 frp, dblk_lastfree_desb, KM_NOSLEEP);
933 934
934 935 if (mp != NULL)
935 936 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
936 937 return (mp);
937 938 }
938 939
939 940 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
940 941 frp, dblk_lastfree_desb, KM_NOSLEEP));
941 942 }
942 943
943 944 /*ARGSUSED*/
944 945 mblk_t *
945 946 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
946 947 {
947 948 mblk_t *mp;
948 949
949 950 /*
950 951 * Note that this is structured to allow the common case (i.e.
951 952 * STREAMS flowtracing disabled) to call gesballoc() with tail
952 953 * call optimization.
953 954 */
954 955 if (!str_ftnever) {
955 956 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
956 957 frp, freebs_enqueue, KM_NOSLEEP);
957 958
958 959 if (mp != NULL)
959 960 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
960 961 return (mp);
961 962 }
962 963
963 964 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
964 965 frp, freebs_enqueue, KM_NOSLEEP));
965 966 }
966 967
967 968 /*ARGSUSED*/
968 969 mblk_t *
969 970 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
970 971 {
971 972 mblk_t *mp;
972 973
973 974 /*
974 975 * Note that this is structured to allow the common case (i.e.
975 976 * STREAMS flowtracing disabled) to call gesballoc() with tail
976 977 * call optimization.
977 978 */
978 979 if (!str_ftnever) {
979 980 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
980 981 frp, dblk_lastfree_desb, KM_NOSLEEP);
981 982
982 983 if (mp != NULL)
983 984 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
984 985 return (mp);
985 986 }
986 987
987 988 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
988 989 frp, dblk_lastfree_desb, KM_NOSLEEP));
989 990 }
990 991
991 992 static void
992 993 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
993 994 {
994 995 bcache_t *bcp = dbp->db_cache;
995 996
996 997 ASSERT(dbp->db_mblk == mp);
997 998 if (dbp->db_fthdr != NULL)
998 999 str_ftfree(dbp);
999 1000
1000 1001 /* set credp and projid to be 'unspecified' before returning to cache */
1001 1002 if (dbp->db_credp != NULL) {
1002 1003 crfree(dbp->db_credp);
1003 1004 dbp->db_credp = NULL;
1004 1005 }
1005 1006 dbp->db_cpid = -1;
1006 1007 dbp->db_struioflag = 0;
1007 1008 dbp->db_struioun.cksum.flags = 0;
1008 1009
1009 1010 mutex_enter(&bcp->mutex);
1010 1011 kmem_cache_free(bcp->dblk_cache, dbp);
1011 1012 bcp->alloc--;
1012 1013
1013 1014 if (bcp->alloc == 0 && bcp->destroy != 0) {
1014 1015 kmem_cache_destroy(bcp->dblk_cache);
1015 1016 kmem_cache_destroy(bcp->buffer_cache);
1016 1017 mutex_exit(&bcp->mutex);
1017 1018 mutex_destroy(&bcp->mutex);
1018 1019 kmem_free(bcp, sizeof (bcache_t));
1019 1020 } else {
1020 1021 mutex_exit(&bcp->mutex);
1021 1022 }
1022 1023 }
1023 1024
1024 1025 bcache_t *
1025 1026 bcache_create(char *name, size_t size, uint_t align)
1026 1027 {
1027 1028 bcache_t *bcp;
1028 1029 char buffer[255];
1029 1030
1030 1031 ASSERT((align & (align - 1)) == 0);
1031 1032
1032 1033 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1033 1034 return (NULL);
1034 1035
1035 1036 bcp->size = size;
1036 1037 bcp->align = align;
1037 1038 bcp->alloc = 0;
1038 1039 bcp->destroy = 0;
1039 1040
1040 1041 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1041 1042
1042 1043 (void) sprintf(buffer, "%s_buffer_cache", name);
1043 1044 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1044 1045 NULL, NULL, NULL, 0);
1045 1046 (void) sprintf(buffer, "%s_dblk_cache", name);
1046 1047 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1047 1048 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1048 1049 NULL, (void *)bcp, NULL, 0);
1049 1050
1050 1051 return (bcp);
1051 1052 }
1052 1053
1053 1054 void
1054 1055 bcache_destroy(bcache_t *bcp)
1055 1056 {
1056 1057 ASSERT(bcp != NULL);
1057 1058
1058 1059 mutex_enter(&bcp->mutex);
1059 1060 if (bcp->alloc == 0) {
1060 1061 kmem_cache_destroy(bcp->dblk_cache);
1061 1062 kmem_cache_destroy(bcp->buffer_cache);
1062 1063 mutex_exit(&bcp->mutex);
1063 1064 mutex_destroy(&bcp->mutex);
1064 1065 kmem_free(bcp, sizeof (bcache_t));
1065 1066 } else {
1066 1067 bcp->destroy++;
1067 1068 mutex_exit(&bcp->mutex);
1068 1069 }
1069 1070 }
1070 1071
1071 1072 /*ARGSUSED*/
1072 1073 mblk_t *
1073 1074 bcache_allocb(bcache_t *bcp, uint_t pri)
1074 1075 {
1075 1076 dblk_t *dbp;
1076 1077 mblk_t *mp = NULL;
1077 1078
1078 1079 ASSERT(bcp != NULL);
1079 1080
1080 1081 mutex_enter(&bcp->mutex);
1081 1082 if (bcp->destroy != 0) {
1082 1083 mutex_exit(&bcp->mutex);
1083 1084 goto out;
1084 1085 }
1085 1086
1086 1087 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1087 1088 mutex_exit(&bcp->mutex);
1088 1089 goto out;
1089 1090 }
1090 1091 bcp->alloc++;
1091 1092 mutex_exit(&bcp->mutex);
1092 1093
1093 1094 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1094 1095
1095 1096 mp = dbp->db_mblk;
1096 1097 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1097 1098 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1098 1099 mp->b_rptr = mp->b_wptr = dbp->db_base;
1099 1100 mp->b_queue = NULL;
1100 1101 MBLK_BAND_FLAG_WORD(mp) = 0;
1101 1102 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1102 1103 out:
1103 1104 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1104 1105
1105 1106 return (mp);
1106 1107 }
1107 1108
1108 1109 static void
1109 1110 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1110 1111 {
1111 1112 ASSERT(dbp->db_mblk == mp);
1112 1113 if (dbp->db_fthdr != NULL)
1113 1114 str_ftfree(dbp);
1114 1115
1115 1116 /* set credp and projid to be 'unspecified' before returning to cache */
1116 1117 if (dbp->db_credp != NULL) {
1117 1118 crfree(dbp->db_credp);
1118 1119 dbp->db_credp = NULL;
1119 1120 }
1120 1121 dbp->db_cpid = -1;
1121 1122 dbp->db_struioflag = 0;
1122 1123 dbp->db_struioun.cksum.flags = 0;
1123 1124
1124 1125 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1125 1126 kmem_cache_free(dbp->db_cache, dbp);
1126 1127 }
1127 1128
1128 1129 static mblk_t *
1129 1130 allocb_oversize(size_t size, int kmflags)
1130 1131 {
1131 1132 mblk_t *mp;
1132 1133 void *buf;
1133 1134
1134 1135 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1135 1136 if ((buf = kmem_alloc(size, kmflags)) == NULL)
1136 1137 return (NULL);
1137 1138 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1138 1139 &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1139 1140 kmem_free(buf, size);
1140 1141
1141 1142 if (mp != NULL)
1142 1143 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1143 1144
1144 1145 return (mp);
1145 1146 }
1146 1147
1147 1148 mblk_t *
1148 1149 allocb_tryhard(size_t target_size)
1149 1150 {
1150 1151 size_t size;
1151 1152 mblk_t *bp;
1152 1153
1153 1154 for (size = target_size; size < target_size + 512;
1154 1155 size += DBLK_CACHE_ALIGN)
1155 1156 if ((bp = allocb(size, BPRI_HI)) != NULL)
1156 1157 return (bp);
1157 1158 allocb_tryhard_fails++;
1158 1159 return (NULL);
1159 1160 }
1160 1161
1161 1162 /*
1162 1163 * This routine is consolidation private for STREAMS internal use
1163 1164 * This routine may only be called from sync routines (i.e., not
1164 1165 * from put or service procedures). It is located here (rather
1165 1166 * than strsubr.c) so that we don't have to expose all of the
1166 1167 * allocb() implementation details in header files.
1167 1168 */
1168 1169 mblk_t *
1169 1170 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1170 1171 {
1171 1172 dblk_t *dbp;
1172 1173 mblk_t *mp;
1173 1174 size_t index;
1174 1175
1175 1176 index = (size -1) >> DBLK_SIZE_SHIFT;
1176 1177
1177 1178 if (flags & STR_NOSIG) {
1178 1179 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1179 1180 if (size != 0) {
1180 1181 mp = allocb_oversize(size, KM_SLEEP);
1181 1182 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1182 1183 (uintptr_t)mp);
1183 1184 return (mp);
1184 1185 }
1185 1186 index = 0;
1186 1187 }
1187 1188
1188 1189 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1189 1190 mp = dbp->db_mblk;
1190 1191 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1191 1192 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1192 1193 mp->b_rptr = mp->b_wptr = dbp->db_base;
1193 1194 mp->b_queue = NULL;
1194 1195 MBLK_BAND_FLAG_WORD(mp) = 0;
1195 1196 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1196 1197
1197 1198 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1198 1199
1199 1200 } else {
1200 1201 while ((mp = allocb(size, pri)) == NULL) {
1201 1202 if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1202 1203 return (NULL);
1203 1204 }
1204 1205 }
1205 1206
1206 1207 return (mp);
1207 1208 }
1208 1209
1209 1210 /*
1210 1211 * Call function 'func' with 'arg' when a class zero block can
1211 1212 * be allocated with priority 'pri'.
1212 1213 */
1213 1214 bufcall_id_t
1214 1215 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1215 1216 {
1216 1217 return (bufcall(1, pri, func, arg));
1217 1218 }
1218 1219
1219 1220 /*
1220 1221 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1221 1222 * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1222 1223 * This provides consistency for all internal allocators of ioctl.
1223 1224 */
1224 1225 mblk_t *
1225 1226 mkiocb(uint_t cmd)
1226 1227 {
1227 1228 struct iocblk *ioc;
1228 1229 mblk_t *mp;
1229 1230
1230 1231 /*
1231 1232 * Allocate enough space for any of the ioctl related messages.
1232 1233 */
1233 1234 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1234 1235 return (NULL);
1235 1236
1236 1237 bzero(mp->b_rptr, sizeof (union ioctypes));
1237 1238
1238 1239 /*
1239 1240 * Set the mblk_t information and ptrs correctly.
1240 1241 */
1241 1242 mp->b_wptr += sizeof (struct iocblk);
1242 1243 mp->b_datap->db_type = M_IOCTL;
1243 1244
1244 1245 /*
1245 1246 * Fill in the fields.
1246 1247 */
1247 1248 ioc = (struct iocblk *)mp->b_rptr;
1248 1249 ioc->ioc_cmd = cmd;
1249 1250 ioc->ioc_cr = kcred;
1250 1251 ioc->ioc_id = getiocseqno();
1251 1252 ioc->ioc_flag = IOC_NATIVE;
1252 1253 return (mp);
1253 1254 }
1254 1255
1255 1256 /*
1256 1257 * test if block of given size can be allocated with a request of
1257 1258 * the given priority.
1258 1259 * 'pri' is no longer used, but is retained for compatibility.
1259 1260 */
1260 1261 /* ARGSUSED */
1261 1262 int
1262 1263 testb(size_t size, uint_t pri)
1263 1264 {
1264 1265 return ((size + sizeof (dblk_t)) <= kmem_avail());
1265 1266 }
1266 1267
1267 1268 /*
1268 1269 * Call function 'func' with argument 'arg' when there is a reasonably
1269 1270 * good chance that a block of size 'size' can be allocated.
1270 1271 * 'pri' is no longer used, but is retained for compatibility.
1271 1272 */
1272 1273 /* ARGSUSED */
1273 1274 bufcall_id_t
1274 1275 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1275 1276 {
1276 1277 static long bid = 1; /* always odd to save checking for zero */
1277 1278 bufcall_id_t bc_id;
1278 1279 struct strbufcall *bcp;
1279 1280
1280 1281 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1281 1282 return (0);
1282 1283
1283 1284 bcp->bc_func = func;
1284 1285 bcp->bc_arg = arg;
1285 1286 bcp->bc_size = size;
1286 1287 bcp->bc_next = NULL;
1287 1288 bcp->bc_executor = NULL;
1288 1289
1289 1290 mutex_enter(&strbcall_lock);
1290 1291 /*
1291 1292 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1292 1293 * should be no references to bcp since it may be freed by
1293 1294 * runbufcalls(). Since bcp_id field is returned, we save its value in
1294 1295 * the local var.
1295 1296 */
1296 1297 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */
1297 1298
1298 1299 /*
1299 1300 * add newly allocated stream event to existing
1300 1301 * linked list of events.
1301 1302 */
1302 1303 if (strbcalls.bc_head == NULL) {
1303 1304 strbcalls.bc_head = strbcalls.bc_tail = bcp;
1304 1305 } else {
1305 1306 strbcalls.bc_tail->bc_next = bcp;
1306 1307 strbcalls.bc_tail = bcp;
1307 1308 }
1308 1309
1309 1310 cv_signal(&strbcall_cv);
1310 1311 mutex_exit(&strbcall_lock);
1311 1312 return (bc_id);
1312 1313 }
1313 1314
1314 1315 /*
1315 1316 * Cancel a bufcall request.
1316 1317 */
1317 1318 void
1318 1319 unbufcall(bufcall_id_t id)
1319 1320 {
1320 1321 strbufcall_t *bcp, *pbcp;
1321 1322
1322 1323 mutex_enter(&strbcall_lock);
1323 1324 again:
1324 1325 pbcp = NULL;
1325 1326 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1326 1327 if (id == bcp->bc_id)
1327 1328 break;
1328 1329 pbcp = bcp;
1329 1330 }
1330 1331 if (bcp) {
1331 1332 if (bcp->bc_executor != NULL) {
1332 1333 if (bcp->bc_executor != curthread) {
1333 1334 cv_wait(&bcall_cv, &strbcall_lock);
1334 1335 goto again;
1335 1336 }
1336 1337 } else {
1337 1338 if (pbcp)
1338 1339 pbcp->bc_next = bcp->bc_next;
1339 1340 else
1340 1341 strbcalls.bc_head = bcp->bc_next;
1341 1342 if (bcp == strbcalls.bc_tail)
1342 1343 strbcalls.bc_tail = pbcp;
1343 1344 kmem_free(bcp, sizeof (strbufcall_t));
1344 1345 }
1345 1346 }
1346 1347 mutex_exit(&strbcall_lock);
1347 1348 }
1348 1349
1349 1350 /*
1350 1351 * Duplicate a message block by block (uses dupb), returning
1351 1352 * a pointer to the duplicate message.
1352 1353 * Returns a non-NULL value only if the entire message
1353 1354 * was dup'd.
1354 1355 */
1355 1356 mblk_t *
1356 1357 dupmsg(mblk_t *bp)
1357 1358 {
1358 1359 mblk_t *head, *nbp;
1359 1360
1360 1361 if (!bp || !(nbp = head = dupb(bp)))
1361 1362 return (NULL);
1362 1363
1363 1364 while (bp->b_cont) {
1364 1365 if (!(nbp->b_cont = dupb(bp->b_cont))) {
1365 1366 freemsg(head);
1366 1367 return (NULL);
1367 1368 }
1368 1369 nbp = nbp->b_cont;
1369 1370 bp = bp->b_cont;
1370 1371 }
1371 1372 return (head);
1372 1373 }
1373 1374
1374 1375 #define DUPB_NOLOAN(bp) \
1375 1376 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1376 1377 copyb((bp)) : dupb((bp)))
1377 1378
1378 1379 mblk_t *
1379 1380 dupmsg_noloan(mblk_t *bp)
1380 1381 {
1381 1382 mblk_t *head, *nbp;
1382 1383
1383 1384 if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1384 1385 ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1385 1386 return (NULL);
1386 1387
1387 1388 while (bp->b_cont) {
1388 1389 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1389 1390 freemsg(head);
1390 1391 return (NULL);
1391 1392 }
1392 1393 nbp = nbp->b_cont;
1393 1394 bp = bp->b_cont;
1394 1395 }
1395 1396 return (head);
1396 1397 }
1397 1398
1398 1399 /*
1399 1400 * Copy data from message and data block to newly allocated message and
1400 1401 * data block. Returns new message block pointer, or NULL if error.
1401 1402 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1402 1403 * as in the original even when db_base is not word aligned. (bug 1052877)
1403 1404 */
1404 1405 mblk_t *
1405 1406 copyb(mblk_t *bp)
1406 1407 {
1407 1408 mblk_t *nbp;
1408 1409 dblk_t *dp, *ndp;
1409 1410 uchar_t *base;
1410 1411 size_t size;
1411 1412 size_t unaligned;
1412 1413
1413 1414 ASSERT(bp->b_wptr >= bp->b_rptr);
1414 1415
1415 1416 dp = bp->b_datap;
1416 1417 if (dp->db_fthdr != NULL)
1417 1418 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1418 1419
1419 1420 /*
1420 1421 * Special handling for Multidata message; this should be
1421 1422 * removed once a copy-callback routine is made available.
1422 1423 */
1423 1424 if (dp->db_type == M_MULTIDATA) {
1424 1425 cred_t *cr;
1425 1426
1426 1427 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1427 1428 return (NULL);
1428 1429
1429 1430 nbp->b_flag = bp->b_flag;
1430 1431 nbp->b_band = bp->b_band;
1431 1432 ndp = nbp->b_datap;
1432 1433
1433 1434 /* See comments below on potential issues. */
1434 1435 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1435 1436
1436 1437 ASSERT(ndp->db_type == dp->db_type);
1437 1438 cr = dp->db_credp;
1438 1439 if (cr != NULL)
1439 1440 crhold(ndp->db_credp = cr);
1440 1441 ndp->db_cpid = dp->db_cpid;
1441 1442 return (nbp);
1442 1443 }
1443 1444
1444 1445 size = dp->db_lim - dp->db_base;
1445 1446 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1446 1447 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1447 1448 return (NULL);
1448 1449 nbp->b_flag = bp->b_flag;
1449 1450 nbp->b_band = bp->b_band;
1450 1451 ndp = nbp->b_datap;
1451 1452
1452 1453 /*
1453 1454 * Well, here is a potential issue. If we are trying to
1454 1455 * trace a flow, and we copy the message, we might lose
1455 1456 * information about where this message might have been.
1456 1457 * So we should inherit the FT data. On the other hand,
1457 1458 * a user might be interested only in alloc to free data.
1458 1459 * So I guess the real answer is to provide a tunable.
1459 1460 */
1460 1461 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1461 1462
1462 1463 base = ndp->db_base + unaligned;
1463 1464 bcopy(dp->db_base, ndp->db_base + unaligned, size);
1464 1465
1465 1466 nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1466 1467 nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1467 1468
1468 1469 return (nbp);
1469 1470 }
1470 1471
1471 1472 /*
1472 1473 * Copy data from message to newly allocated message using new
1473 1474 * data blocks. Returns a pointer to the new message, or NULL if error.
1474 1475 */
1475 1476 mblk_t *
1476 1477 copymsg(mblk_t *bp)
1477 1478 {
1478 1479 mblk_t *head, *nbp;
1479 1480
1480 1481 if (!bp || !(nbp = head = copyb(bp)))
1481 1482 return (NULL);
1482 1483
1483 1484 while (bp->b_cont) {
1484 1485 if (!(nbp->b_cont = copyb(bp->b_cont))) {
1485 1486 freemsg(head);
1486 1487 return (NULL);
1487 1488 }
1488 1489 nbp = nbp->b_cont;
1489 1490 bp = bp->b_cont;
1490 1491 }
1491 1492 return (head);
1492 1493 }
1493 1494
1494 1495 /*
1495 1496 * link a message block to tail of message
1496 1497 */
1497 1498 void
1498 1499 linkb(mblk_t *mp, mblk_t *bp)
1499 1500 {
1500 1501 ASSERT(mp && bp);
1501 1502
1502 1503 for (; mp->b_cont; mp = mp->b_cont)
1503 1504 ;
1504 1505 mp->b_cont = bp;
1505 1506 }
1506 1507
1507 1508 /*
1508 1509 * unlink a message block from head of message
1509 1510 * return pointer to new message.
1510 1511 * NULL if message becomes empty.
1511 1512 */
1512 1513 mblk_t *
1513 1514 unlinkb(mblk_t *bp)
1514 1515 {
1515 1516 mblk_t *bp1;
1516 1517
1517 1518 bp1 = bp->b_cont;
1518 1519 bp->b_cont = NULL;
1519 1520 return (bp1);
1520 1521 }
1521 1522
1522 1523 /*
1523 1524 * remove a message block "bp" from message "mp"
1524 1525 *
1525 1526 * Return pointer to new message or NULL if no message remains.
1526 1527 * Return -1 if bp is not found in message.
1527 1528 */
1528 1529 mblk_t *
1529 1530 rmvb(mblk_t *mp, mblk_t *bp)
1530 1531 {
1531 1532 mblk_t *tmp;
1532 1533 mblk_t *lastp = NULL;
1533 1534
1534 1535 ASSERT(mp && bp);
1535 1536 for (tmp = mp; tmp; tmp = tmp->b_cont) {
1536 1537 if (tmp == bp) {
1537 1538 if (lastp)
1538 1539 lastp->b_cont = tmp->b_cont;
1539 1540 else
1540 1541 mp = tmp->b_cont;
1541 1542 tmp->b_cont = NULL;
1542 1543 return (mp);
1543 1544 }
1544 1545 lastp = tmp;
1545 1546 }
1546 1547 return ((mblk_t *)-1);
1547 1548 }
1548 1549
1549 1550 /*
1550 1551 * Concatenate and align first len bytes of common
1551 1552 * message type. Len == -1, means concat everything.
1552 1553 * Returns 1 on success, 0 on failure
1553 1554 * After the pullup, mp points to the pulled up data.
1554 1555 */
1555 1556 int
1556 1557 pullupmsg(mblk_t *mp, ssize_t len)
1557 1558 {
1558 1559 mblk_t *bp, *b_cont;
1559 1560 dblk_t *dbp;
1560 1561 ssize_t n;
1561 1562
1562 1563 ASSERT(mp->b_datap->db_ref > 0);
1563 1564 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1564 1565
1565 1566 /*
1566 1567 * We won't handle Multidata message, since it contains
1567 1568 * metadata which this function has no knowledge of; we
1568 1569 * assert on DEBUG, and return failure otherwise.
1569 1570 */
1570 1571 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1571 1572 if (mp->b_datap->db_type == M_MULTIDATA)
1572 1573 return (0);
1573 1574
1574 1575 if (len == -1) {
1575 1576 if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1576 1577 return (1);
1577 1578 len = xmsgsize(mp);
1578 1579 } else {
1579 1580 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1580 1581 ASSERT(first_mblk_len >= 0);
1581 1582 /*
1582 1583 * If the length is less than that of the first mblk,
1583 1584 * we want to pull up the message into an aligned mblk.
1584 1585 * Though not part of the spec, some callers assume it.
1585 1586 */
1586 1587 if (len <= first_mblk_len) {
1587 1588 if (str_aligned(mp->b_rptr))
1588 1589 return (1);
1589 1590 len = first_mblk_len;
1590 1591 } else if (xmsgsize(mp) < len)
1591 1592 return (0);
1592 1593 }
1593 1594
1594 1595 if ((bp = allocb_tmpl(len, mp)) == NULL)
1595 1596 return (0);
1596 1597
1597 1598 dbp = bp->b_datap;
1598 1599 *bp = *mp; /* swap mblks so bp heads the old msg... */
1599 1600 mp->b_datap = dbp; /* ... and mp heads the new message */
1600 1601 mp->b_datap->db_mblk = mp;
1601 1602 bp->b_datap->db_mblk = bp;
1602 1603 mp->b_rptr = mp->b_wptr = dbp->db_base;
1603 1604
1604 1605 do {
1605 1606 ASSERT(bp->b_datap->db_ref > 0);
1606 1607 ASSERT(bp->b_wptr >= bp->b_rptr);
1607 1608 n = MIN(bp->b_wptr - bp->b_rptr, len);
1608 1609 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1609 1610 if (n > 0)
1610 1611 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1611 1612 mp->b_wptr += n;
1612 1613 bp->b_rptr += n;
1613 1614 len -= n;
1614 1615 if (bp->b_rptr != bp->b_wptr)
1615 1616 break;
1616 1617 b_cont = bp->b_cont;
1617 1618 freeb(bp);
1618 1619 bp = b_cont;
1619 1620 } while (len && bp);
1620 1621
1621 1622 mp->b_cont = bp; /* tack on whatever wasn't pulled up */
1622 1623
1623 1624 return (1);
1624 1625 }
1625 1626
1626 1627 /*
1627 1628 * Concatenate and align at least the first len bytes of common message
1628 1629 * type. Len == -1 means concatenate everything. The original message is
1629 1630 * unaltered. Returns a pointer to a new message on success, otherwise
1630 1631 * returns NULL.
1631 1632 */
1632 1633 mblk_t *
1633 1634 msgpullup(mblk_t *mp, ssize_t len)
1634 1635 {
1635 1636 mblk_t *newmp;
1636 1637 ssize_t totlen;
1637 1638 ssize_t n;
1638 1639
1639 1640 /*
1640 1641 * We won't handle Multidata message, since it contains
1641 1642 * metadata which this function has no knowledge of; we
1642 1643 * assert on DEBUG, and return failure otherwise.
1643 1644 */
1644 1645 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1645 1646 if (mp->b_datap->db_type == M_MULTIDATA)
1646 1647 return (NULL);
1647 1648
1648 1649 totlen = xmsgsize(mp);
1649 1650
1650 1651 if ((len > 0) && (len > totlen))
1651 1652 return (NULL);
1652 1653
1653 1654 /*
1654 1655 * Copy all of the first msg type into one new mblk, then dupmsg
1655 1656 * and link the rest onto this.
1656 1657 */
1657 1658
1658 1659 len = totlen;
1659 1660
1660 1661 if ((newmp = allocb_tmpl(len, mp)) == NULL)
1661 1662 return (NULL);
1662 1663
1663 1664 newmp->b_flag = mp->b_flag;
1664 1665 newmp->b_band = mp->b_band;
1665 1666
1666 1667 while (len > 0) {
1667 1668 n = mp->b_wptr - mp->b_rptr;
1668 1669 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1669 1670 if (n > 0)
1670 1671 bcopy(mp->b_rptr, newmp->b_wptr, n);
1671 1672 newmp->b_wptr += n;
1672 1673 len -= n;
1673 1674 mp = mp->b_cont;
1674 1675 }
1675 1676
1676 1677 if (mp != NULL) {
1677 1678 newmp->b_cont = dupmsg(mp);
1678 1679 if (newmp->b_cont == NULL) {
1679 1680 freemsg(newmp);
1680 1681 return (NULL);
1681 1682 }
1682 1683 }
1683 1684
1684 1685 return (newmp);
1685 1686 }
1686 1687
1687 1688 /*
1688 1689 * Trim bytes from message
1689 1690 * len > 0, trim from head
1690 1691 * len < 0, trim from tail
1691 1692 * Returns 1 on success, 0 on failure.
1692 1693 */
1693 1694 int
1694 1695 adjmsg(mblk_t *mp, ssize_t len)
1695 1696 {
1696 1697 mblk_t *bp;
1697 1698 mblk_t *save_bp = NULL;
1698 1699 mblk_t *prev_bp;
1699 1700 mblk_t *bcont;
1700 1701 unsigned char type;
1701 1702 ssize_t n;
1702 1703 int fromhead;
1703 1704 int first;
1704 1705
1705 1706 ASSERT(mp != NULL);
1706 1707 /*
1707 1708 * We won't handle Multidata message, since it contains
1708 1709 * metadata which this function has no knowledge of; we
1709 1710 * assert on DEBUG, and return failure otherwise.
1710 1711 */
1711 1712 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1712 1713 if (mp->b_datap->db_type == M_MULTIDATA)
1713 1714 return (0);
1714 1715
1715 1716 if (len < 0) {
1716 1717 fromhead = 0;
1717 1718 len = -len;
1718 1719 } else {
1719 1720 fromhead = 1;
1720 1721 }
1721 1722
1722 1723 if (xmsgsize(mp) < len)
1723 1724 return (0);
1724 1725
1725 1726 if (fromhead) {
1726 1727 first = 1;
1727 1728 while (len) {
1728 1729 ASSERT(mp->b_wptr >= mp->b_rptr);
1729 1730 n = MIN(mp->b_wptr - mp->b_rptr, len);
1730 1731 mp->b_rptr += n;
1731 1732 len -= n;
1732 1733
1733 1734 /*
1734 1735 * If this is not the first zero length
1735 1736 * message remove it
1736 1737 */
1737 1738 if (!first && (mp->b_wptr == mp->b_rptr)) {
1738 1739 bcont = mp->b_cont;
1739 1740 freeb(mp);
1740 1741 mp = save_bp->b_cont = bcont;
1741 1742 } else {
1742 1743 save_bp = mp;
1743 1744 mp = mp->b_cont;
1744 1745 }
1745 1746 first = 0;
1746 1747 }
1747 1748 } else {
1748 1749 type = mp->b_datap->db_type;
1749 1750 while (len) {
1750 1751 bp = mp;
1751 1752 save_bp = NULL;
1752 1753
1753 1754 /*
1754 1755 * Find the last message of same type
1755 1756 */
1756 1757 while (bp && bp->b_datap->db_type == type) {
1757 1758 ASSERT(bp->b_wptr >= bp->b_rptr);
1758 1759 prev_bp = save_bp;
1759 1760 save_bp = bp;
1760 1761 bp = bp->b_cont;
1761 1762 }
1762 1763 if (save_bp == NULL)
1763 1764 break;
1764 1765 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1765 1766 save_bp->b_wptr -= n;
1766 1767 len -= n;
1767 1768
1768 1769 /*
1769 1770 * If this is not the first message
1770 1771 * and we have taken away everything
1771 1772 * from this message, remove it
1772 1773 */
1773 1774
1774 1775 if ((save_bp != mp) &&
1775 1776 (save_bp->b_wptr == save_bp->b_rptr)) {
1776 1777 bcont = save_bp->b_cont;
1777 1778 freeb(save_bp);
1778 1779 prev_bp->b_cont = bcont;
1779 1780 }
1780 1781 }
1781 1782 }
1782 1783 return (1);
1783 1784 }
1784 1785
1785 1786 /*
1786 1787 * get number of data bytes in message
1787 1788 */
1788 1789 size_t
1789 1790 msgdsize(mblk_t *bp)
1790 1791 {
1791 1792 size_t count = 0;
1792 1793
1793 1794 for (; bp; bp = bp->b_cont)
1794 1795 if (bp->b_datap->db_type == M_DATA) {
1795 1796 ASSERT(bp->b_wptr >= bp->b_rptr);
1796 1797 count += bp->b_wptr - bp->b_rptr;
1797 1798 }
1798 1799 return (count);
1799 1800 }
1800 1801
1801 1802 /*
1802 1803 * Get a message off head of queue
1803 1804 *
1804 1805 * If queue has no buffers then mark queue
1805 1806 * with QWANTR. (queue wants to be read by
1806 1807 * someone when data becomes available)
1807 1808 *
1808 1809 * If there is something to take off then do so.
1809 1810 * If queue falls below hi water mark turn off QFULL
1810 1811 * flag. Decrement weighted count of queue.
1811 1812 * Also turn off QWANTR because queue is being read.
1812 1813 *
1813 1814 * The queue count is maintained on a per-band basis.
1814 1815 * Priority band 0 (normal messages) uses q_count,
1815 1816 * q_lowat, etc. Non-zero priority bands use the
1816 1817 * fields in their respective qband structures
1817 1818 * (qb_count, qb_lowat, etc.) All messages appear
1818 1819 * on the same list, linked via their b_next pointers.
1819 1820 * q_first is the head of the list. q_count does
1820 1821 * not reflect the size of all the messages on the
1821 1822 * queue. It only reflects those messages in the
1822 1823 * normal band of flow. The one exception to this
1823 1824 * deals with high priority messages. They are in
1824 1825 * their own conceptual "band", but are accounted
1825 1826 * against q_count.
1826 1827 *
1827 1828 * If queue count is below the lo water mark and QWANTW
1828 1829 * is set, enable the closest backq which has a service
1829 1830 * procedure and turn off the QWANTW flag.
1830 1831 *
1831 1832 * getq could be built on top of rmvq, but isn't because
1832 1833 * of performance considerations.
1833 1834 *
1834 1835 * A note on the use of q_count and q_mblkcnt:
1835 1836 * q_count is the traditional byte count for messages that
1836 1837 * have been put on a queue. Documentation tells us that
1837 1838 * we shouldn't rely on that count, but some drivers/modules
1838 1839 * do. What was needed, however, is a mechanism to prevent
1839 1840 * runaway streams from consuming all of the resources,
1840 1841 * and particularly be able to flow control zero-length
1841 1842 * messages. q_mblkcnt is used for this purpose. It
1842 1843 * counts the number of mblk's that are being put on
1843 1844 * the queue. The intention here, is that each mblk should
1844 1845 * contain one byte of data and, for the purpose of
1845 1846 * flow-control, logically does. A queue will become
1846 1847 * full when EITHER of these values (q_count and q_mblkcnt)
1847 1848 * reach the highwater mark. It will clear when BOTH
1848 1849 * of them drop below the highwater mark. And it will
1849 1850 * backenable when BOTH of them drop below the lowwater
1850 1851 * mark.
1851 1852 * With this algorithm, a driver/module might be able
1852 1853 * to find a reasonably accurate q_count, and the
1853 1854 * framework can still try and limit resource usage.
1854 1855 */
1855 1856 mblk_t *
1856 1857 getq(queue_t *q)
1857 1858 {
1858 1859 mblk_t *bp;
1859 1860 uchar_t band = 0;
1860 1861
1861 1862 bp = getq_noenab(q, 0);
1862 1863 if (bp != NULL)
1863 1864 band = bp->b_band;
1864 1865
1865 1866 /*
1866 1867 * Inlined from qbackenable().
1867 1868 * Quick check without holding the lock.
1868 1869 */
1869 1870 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1870 1871 return (bp);
1871 1872
1872 1873 qbackenable(q, band);
1873 1874 return (bp);
1874 1875 }
1875 1876
1876 1877 /*
1877 1878 * Calculate number of data bytes in a single data message block taking
1878 1879 * multidata messages into account.
1879 1880 */
1880 1881
1881 1882 #define ADD_MBLK_SIZE(mp, size) \
1882 1883 if (DB_TYPE(mp) != M_MULTIDATA) { \
1883 1884 (size) += MBLKL(mp); \
1884 1885 } else { \
1885 1886 uint_t pinuse; \
1886 1887 \
1887 1888 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \
1888 1889 (size) += pinuse; \
1889 1890 }
1890 1891
1891 1892 /*
1892 1893 * Returns the number of bytes in a message (a message is defined as a
1893 1894 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1894 1895 * also return the number of distinct mblks in the message.
1895 1896 */
1896 1897 int
1897 1898 mp_cont_len(mblk_t *bp, int *mblkcnt)
1898 1899 {
1899 1900 mblk_t *mp;
1900 1901 int mblks = 0;
1901 1902 int bytes = 0;
1902 1903
1903 1904 for (mp = bp; mp != NULL; mp = mp->b_cont) {
1904 1905 ADD_MBLK_SIZE(mp, bytes);
1905 1906 mblks++;
1906 1907 }
1907 1908
1908 1909 if (mblkcnt != NULL)
1909 1910 *mblkcnt = mblks;
1910 1911
1911 1912 return (bytes);
1912 1913 }
1913 1914
1914 1915 /*
1915 1916 * Like getq() but does not backenable. This is used by the stream
1916 1917 * head when a putback() is likely. The caller must call qbackenable()
1917 1918 * after it is done with accessing the queue.
1918 1919 * The rbytes arguments to getq_noneab() allows callers to specify a
1919 1920 * the maximum number of bytes to return. If the current amount on the
1920 1921 * queue is less than this then the entire message will be returned.
1921 1922 * A value of 0 returns the entire message and is equivalent to the old
1922 1923 * default behaviour prior to the addition of the rbytes argument.
1923 1924 */
1924 1925 mblk_t *
1925 1926 getq_noenab(queue_t *q, ssize_t rbytes)
1926 1927 {
1927 1928 mblk_t *bp, *mp1;
1928 1929 mblk_t *mp2 = NULL;
1929 1930 qband_t *qbp;
1930 1931 kthread_id_t freezer;
1931 1932 int bytecnt = 0, mblkcnt = 0;
1932 1933
1933 1934 /* freezestr should allow its caller to call getq/putq */
1934 1935 freezer = STREAM(q)->sd_freezer;
1935 1936 if (freezer == curthread) {
1936 1937 ASSERT(frozenstr(q));
1937 1938 ASSERT(MUTEX_HELD(QLOCK(q)));
1938 1939 } else
1939 1940 mutex_enter(QLOCK(q));
1940 1941
1941 1942 if ((bp = q->q_first) == 0) {
1942 1943 q->q_flag |= QWANTR;
1943 1944 } else {
1944 1945 /*
1945 1946 * If the caller supplied a byte threshold and there is
1946 1947 * more than this amount on the queue then break up the
1947 1948 * the message appropriately. We can only safely do
1948 1949 * this for M_DATA messages.
1949 1950 */
1950 1951 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1951 1952 (q->q_count > rbytes)) {
1952 1953 /*
1953 1954 * Inline version of mp_cont_len() which terminates
1954 1955 * when we meet or exceed rbytes.
1955 1956 */
1956 1957 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1957 1958 mblkcnt++;
1958 1959 ADD_MBLK_SIZE(mp1, bytecnt);
1959 1960 if (bytecnt >= rbytes)
1960 1961 break;
1961 1962 }
1962 1963 /*
1963 1964 * We need to account for the following scenarios:
1964 1965 *
1965 1966 * 1) Too much data in the first message:
1966 1967 * mp1 will be the mblk which puts us over our
1967 1968 * byte limit.
1968 1969 * 2) Not enough data in the first message:
1969 1970 * mp1 will be NULL.
1970 1971 * 3) Exactly the right amount of data contained within
1971 1972 * whole mblks:
1972 1973 * mp1->b_cont will be where we break the message.
1973 1974 */
1974 1975 if (bytecnt > rbytes) {
1975 1976 /*
1976 1977 * Dup/copy mp1 and put what we don't need
1977 1978 * back onto the queue. Adjust the read/write
1978 1979 * and continuation pointers appropriately
1979 1980 * and decrement the current mblk count to
1980 1981 * reflect we are putting an mblk back onto
1981 1982 * the queue.
1982 1983 * When adjusting the message pointers, it's
1983 1984 * OK to use the existing bytecnt and the
1984 1985 * requested amount (rbytes) to calculate the
1985 1986 * the new write offset (b_wptr) of what we
1986 1987 * are taking. However, we cannot use these
1987 1988 * values when calculating the read offset of
1988 1989 * the mblk we are putting back on the queue.
1989 1990 * This is because the begining (b_rptr) of the
1990 1991 * mblk represents some arbitrary point within
1991 1992 * the message.
1992 1993 * It's simplest to do this by advancing b_rptr
1993 1994 * by the new length of mp1 as we don't have to
1994 1995 * remember any intermediate state.
1995 1996 */
1996 1997 ASSERT(mp1 != NULL);
1997 1998 mblkcnt--;
1998 1999 if ((mp2 = dupb(mp1)) == NULL &&
1999 2000 (mp2 = copyb(mp1)) == NULL) {
2000 2001 bytecnt = mblkcnt = 0;
2001 2002 goto dup_failed;
2002 2003 }
2003 2004 mp2->b_cont = mp1->b_cont;
2004 2005 mp1->b_wptr -= bytecnt - rbytes;
2005 2006 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2006 2007 mp1->b_cont = NULL;
2007 2008 bytecnt = rbytes;
2008 2009 } else {
2009 2010 /*
2010 2011 * Either there is not enough data in the first
2011 2012 * message or there is no excess data to deal
2012 2013 * with. If mp1 is NULL, we are taking the
2013 2014 * whole message. No need to do anything.
2014 2015 * Otherwise we assign mp1->b_cont to mp2 as
2015 2016 * we will be putting this back onto the head of
2016 2017 * the queue.
2017 2018 */
2018 2019 if (mp1 != NULL) {
2019 2020 mp2 = mp1->b_cont;
2020 2021 mp1->b_cont = NULL;
2021 2022 }
2022 2023 }
2023 2024 /*
2024 2025 * If mp2 is not NULL then we have part of the message
2025 2026 * to put back onto the queue.
2026 2027 */
2027 2028 if (mp2 != NULL) {
2028 2029 if ((mp2->b_next = bp->b_next) == NULL)
2029 2030 q->q_last = mp2;
2030 2031 else
2031 2032 bp->b_next->b_prev = mp2;
2032 2033 q->q_first = mp2;
2033 2034 } else {
2034 2035 if ((q->q_first = bp->b_next) == NULL)
2035 2036 q->q_last = NULL;
2036 2037 else
2037 2038 q->q_first->b_prev = NULL;
2038 2039 }
2039 2040 } else {
2040 2041 /*
2041 2042 * Either no byte threshold was supplied, there is
2042 2043 * not enough on the queue or we failed to
2043 2044 * duplicate/copy a data block. In these cases we
2044 2045 * just take the entire first message.
2045 2046 */
2046 2047 dup_failed:
2047 2048 bytecnt = mp_cont_len(bp, &mblkcnt);
2048 2049 if ((q->q_first = bp->b_next) == NULL)
2049 2050 q->q_last = NULL;
2050 2051 else
2051 2052 q->q_first->b_prev = NULL;
2052 2053 }
2053 2054 if (bp->b_band == 0) {
2054 2055 q->q_count -= bytecnt;
2055 2056 q->q_mblkcnt -= mblkcnt;
2056 2057 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2057 2058 (q->q_mblkcnt < q->q_hiwat))) {
2058 2059 q->q_flag &= ~QFULL;
2059 2060 }
2060 2061 } else {
2061 2062 int i;
2062 2063
2063 2064 ASSERT(bp->b_band <= q->q_nband);
2064 2065 ASSERT(q->q_bandp != NULL);
2065 2066 ASSERT(MUTEX_HELD(QLOCK(q)));
2066 2067 qbp = q->q_bandp;
2067 2068 i = bp->b_band;
2068 2069 while (--i > 0)
2069 2070 qbp = qbp->qb_next;
2070 2071 if (qbp->qb_first == qbp->qb_last) {
2071 2072 qbp->qb_first = NULL;
2072 2073 qbp->qb_last = NULL;
2073 2074 } else {
2074 2075 qbp->qb_first = bp->b_next;
2075 2076 }
2076 2077 qbp->qb_count -= bytecnt;
2077 2078 qbp->qb_mblkcnt -= mblkcnt;
2078 2079 if (qbp->qb_mblkcnt == 0 ||
2079 2080 ((qbp->qb_count < qbp->qb_hiwat) &&
2080 2081 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2081 2082 qbp->qb_flag &= ~QB_FULL;
2082 2083 }
2083 2084 }
2084 2085 q->q_flag &= ~QWANTR;
2085 2086 bp->b_next = NULL;
2086 2087 bp->b_prev = NULL;
2087 2088 }
2088 2089 if (freezer != curthread)
2089 2090 mutex_exit(QLOCK(q));
2090 2091
2091 2092 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
2092 2093
2093 2094 return (bp);
2094 2095 }
2095 2096
2096 2097 /*
2097 2098 * Determine if a backenable is needed after removing a message in the
2098 2099 * specified band.
2099 2100 * NOTE: This routine assumes that something like getq_noenab() has been
2100 2101 * already called.
2101 2102 *
2102 2103 * For the read side it is ok to hold sd_lock across calling this (and the
2103 2104 * stream head often does).
2104 2105 * But for the write side strwakeq might be invoked and it acquires sd_lock.
2105 2106 */
2106 2107 void
2107 2108 qbackenable(queue_t *q, uchar_t band)
2108 2109 {
2109 2110 int backenab = 0;
2110 2111 qband_t *qbp;
2111 2112 kthread_id_t freezer;
2112 2113
2113 2114 ASSERT(q);
2114 2115 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2115 2116
2116 2117 /*
2117 2118 * Quick check without holding the lock.
2118 2119 * OK since after getq() has lowered the q_count these flags
2119 2120 * would not change unless either the qbackenable() is done by
2120 2121 * another thread (which is ok) or the queue has gotten QFULL
2121 2122 * in which case another backenable will take place when the queue
2122 2123 * drops below q_lowat.
2123 2124 */
2124 2125 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2125 2126 return;
2126 2127
2127 2128 /* freezestr should allow its caller to call getq/putq */
2128 2129 freezer = STREAM(q)->sd_freezer;
2129 2130 if (freezer == curthread) {
2130 2131 ASSERT(frozenstr(q));
2131 2132 ASSERT(MUTEX_HELD(QLOCK(q)));
2132 2133 } else
2133 2134 mutex_enter(QLOCK(q));
2134 2135
2135 2136 if (band == 0) {
2136 2137 if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2137 2138 q->q_mblkcnt < q->q_lowat)) {
2138 2139 backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2139 2140 }
2140 2141 } else {
2141 2142 int i;
2142 2143
2143 2144 ASSERT((unsigned)band <= q->q_nband);
2144 2145 ASSERT(q->q_bandp != NULL);
2145 2146
2146 2147 qbp = q->q_bandp;
2147 2148 i = band;
2148 2149 while (--i > 0)
2149 2150 qbp = qbp->qb_next;
2150 2151
2151 2152 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2152 2153 qbp->qb_mblkcnt < qbp->qb_lowat)) {
2153 2154 backenab = qbp->qb_flag & QB_WANTW;
2154 2155 }
2155 2156 }
2156 2157
2157 2158 if (backenab == 0) {
2158 2159 if (freezer != curthread)
2159 2160 mutex_exit(QLOCK(q));
2160 2161 return;
2161 2162 }
2162 2163
2163 2164 /* Have to drop the lock across strwakeq and backenable */
2164 2165 if (backenab & QWANTWSYNC)
2165 2166 q->q_flag &= ~QWANTWSYNC;
2166 2167 if (backenab & (QWANTW|QB_WANTW)) {
2167 2168 if (band != 0)
2168 2169 qbp->qb_flag &= ~QB_WANTW;
2169 2170 else {
2170 2171 q->q_flag &= ~QWANTW;
2171 2172 }
2172 2173 }
2173 2174
2174 2175 if (freezer != curthread)
2175 2176 mutex_exit(QLOCK(q));
2176 2177
2177 2178 if (backenab & QWANTWSYNC)
2178 2179 strwakeq(q, QWANTWSYNC);
2179 2180 if (backenab & (QWANTW|QB_WANTW))
2180 2181 backenable(q, band);
2181 2182 }
2182 2183
2183 2184 /*
2184 2185 * Remove a message from a queue. The queue count and other
2185 2186 * flow control parameters are adjusted and the back queue
2186 2187 * enabled if necessary.
2187 2188 *
2188 2189 * rmvq can be called with the stream frozen, but other utility functions
2189 2190 * holding QLOCK, and by streams modules without any locks/frozen.
2190 2191 */
2191 2192 void
2192 2193 rmvq(queue_t *q, mblk_t *mp)
2193 2194 {
2194 2195 ASSERT(mp != NULL);
2195 2196
2196 2197 rmvq_noenab(q, mp);
2197 2198 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2198 2199 /*
2199 2200 * qbackenable can handle a frozen stream but not a "random"
2200 2201 * qlock being held. Drop lock across qbackenable.
2201 2202 */
2202 2203 mutex_exit(QLOCK(q));
2203 2204 qbackenable(q, mp->b_band);
2204 2205 mutex_enter(QLOCK(q));
2205 2206 } else {
2206 2207 qbackenable(q, mp->b_band);
2207 2208 }
2208 2209 }
2209 2210
2210 2211 /*
2211 2212 * Like rmvq() but without any backenabling.
2212 2213 * This exists to handle SR_CONSOL_DATA in strrput().
2213 2214 */
2214 2215 void
2215 2216 rmvq_noenab(queue_t *q, mblk_t *mp)
2216 2217 {
2217 2218 int i;
2218 2219 qband_t *qbp = NULL;
2219 2220 kthread_id_t freezer;
2220 2221 int bytecnt = 0, mblkcnt = 0;
2221 2222
2222 2223 freezer = STREAM(q)->sd_freezer;
2223 2224 if (freezer == curthread) {
2224 2225 ASSERT(frozenstr(q));
2225 2226 ASSERT(MUTEX_HELD(QLOCK(q)));
2226 2227 } else if (MUTEX_HELD(QLOCK(q))) {
2227 2228 /* Don't drop lock on exit */
2228 2229 freezer = curthread;
2229 2230 } else
2230 2231 mutex_enter(QLOCK(q));
2231 2232
2232 2233 ASSERT(mp->b_band <= q->q_nband);
2233 2234 if (mp->b_band != 0) { /* Adjust band pointers */
2234 2235 ASSERT(q->q_bandp != NULL);
2235 2236 qbp = q->q_bandp;
2236 2237 i = mp->b_band;
2237 2238 while (--i > 0)
2238 2239 qbp = qbp->qb_next;
2239 2240 if (mp == qbp->qb_first) {
2240 2241 if (mp->b_next && mp->b_band == mp->b_next->b_band)
2241 2242 qbp->qb_first = mp->b_next;
2242 2243 else
2243 2244 qbp->qb_first = NULL;
2244 2245 }
2245 2246 if (mp == qbp->qb_last) {
2246 2247 if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2247 2248 qbp->qb_last = mp->b_prev;
2248 2249 else
2249 2250 qbp->qb_last = NULL;
2250 2251 }
2251 2252 }
2252 2253
2253 2254 /*
2254 2255 * Remove the message from the list.
2255 2256 */
2256 2257 if (mp->b_prev)
2257 2258 mp->b_prev->b_next = mp->b_next;
2258 2259 else
2259 2260 q->q_first = mp->b_next;
2260 2261 if (mp->b_next)
2261 2262 mp->b_next->b_prev = mp->b_prev;
2262 2263 else
2263 2264 q->q_last = mp->b_prev;
2264 2265 mp->b_next = NULL;
2265 2266 mp->b_prev = NULL;
2266 2267
2267 2268 /* Get the size of the message for q_count accounting */
2268 2269 bytecnt = mp_cont_len(mp, &mblkcnt);
2269 2270
2270 2271 if (mp->b_band == 0) { /* Perform q_count accounting */
2271 2272 q->q_count -= bytecnt;
2272 2273 q->q_mblkcnt -= mblkcnt;
2273 2274 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2274 2275 (q->q_mblkcnt < q->q_hiwat))) {
2275 2276 q->q_flag &= ~QFULL;
2276 2277 }
2277 2278 } else { /* Perform qb_count accounting */
2278 2279 qbp->qb_count -= bytecnt;
2279 2280 qbp->qb_mblkcnt -= mblkcnt;
2280 2281 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2281 2282 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2282 2283 qbp->qb_flag &= ~QB_FULL;
2283 2284 }
2284 2285 }
2285 2286 if (freezer != curthread)
2286 2287 mutex_exit(QLOCK(q));
2287 2288
2288 2289 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2289 2290 }
2290 2291
2291 2292 /*
2292 2293 * Empty a queue.
2293 2294 * If flag is set, remove all messages. Otherwise, remove
2294 2295 * only non-control messages. If queue falls below its low
2295 2296 * water mark, and QWANTW is set, enable the nearest upstream
2296 2297 * service procedure.
2297 2298 *
2298 2299 * Historical note: when merging the M_FLUSH code in strrput with this
2299 2300 * code one difference was discovered. flushq did not have a check
2300 2301 * for q_lowat == 0 in the backenabling test.
2301 2302 *
2302 2303 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2303 2304 * if one exists on the queue.
2304 2305 */
2305 2306 void
2306 2307 flushq_common(queue_t *q, int flag, int pcproto_flag)
2307 2308 {
2308 2309 mblk_t *mp, *nmp;
2309 2310 qband_t *qbp;
2310 2311 int backenab = 0;
2311 2312 unsigned char bpri;
2312 2313 unsigned char qbf[NBAND]; /* band flushing backenable flags */
2313 2314
2314 2315 if (q->q_first == NULL)
2315 2316 return;
2316 2317
2317 2318 mutex_enter(QLOCK(q));
2318 2319 mp = q->q_first;
2319 2320 q->q_first = NULL;
2320 2321 q->q_last = NULL;
2321 2322 q->q_count = 0;
2322 2323 q->q_mblkcnt = 0;
2323 2324 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2324 2325 qbp->qb_first = NULL;
2325 2326 qbp->qb_last = NULL;
2326 2327 qbp->qb_count = 0;
2327 2328 qbp->qb_mblkcnt = 0;
2328 2329 qbp->qb_flag &= ~QB_FULL;
2329 2330 }
2330 2331 q->q_flag &= ~QFULL;
2331 2332 mutex_exit(QLOCK(q));
2332 2333 while (mp) {
2333 2334 nmp = mp->b_next;
2334 2335 mp->b_next = mp->b_prev = NULL;
2335 2336
2336 2337 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2337 2338
2338 2339 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2339 2340 (void) putq(q, mp);
2340 2341 else if (flag || datamsg(mp->b_datap->db_type))
2341 2342 freemsg(mp);
2342 2343 else
2343 2344 (void) putq(q, mp);
2344 2345 mp = nmp;
2345 2346 }
2346 2347 bpri = 1;
2347 2348 mutex_enter(QLOCK(q));
2348 2349 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2349 2350 if ((qbp->qb_flag & QB_WANTW) &&
2350 2351 (((qbp->qb_count < qbp->qb_lowat) &&
2351 2352 (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2352 2353 qbp->qb_lowat == 0)) {
2353 2354 qbp->qb_flag &= ~QB_WANTW;
2354 2355 backenab = 1;
2355 2356 qbf[bpri] = 1;
2356 2357 } else
2357 2358 qbf[bpri] = 0;
2358 2359 bpri++;
2359 2360 }
2360 2361 ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2361 2362 if ((q->q_flag & QWANTW) &&
2362 2363 (((q->q_count < q->q_lowat) &&
2363 2364 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2364 2365 q->q_flag &= ~QWANTW;
2365 2366 backenab = 1;
2366 2367 qbf[0] = 1;
2367 2368 } else
2368 2369 qbf[0] = 0;
2369 2370
2370 2371 /*
2371 2372 * If any band can now be written to, and there is a writer
2372 2373 * for that band, then backenable the closest service procedure.
2373 2374 */
2374 2375 if (backenab) {
2375 2376 mutex_exit(QLOCK(q));
2376 2377 for (bpri = q->q_nband; bpri != 0; bpri--)
2377 2378 if (qbf[bpri])
2378 2379 backenable(q, bpri);
2379 2380 if (qbf[0])
2380 2381 backenable(q, 0);
2381 2382 } else
2382 2383 mutex_exit(QLOCK(q));
2383 2384 }
2384 2385
2385 2386 /*
2386 2387 * The real flushing takes place in flushq_common. This is done so that
2387 2388 * a flag which specifies whether or not M_PCPROTO messages should be flushed
2388 2389 * or not. Currently the only place that uses this flag is the stream head.
2389 2390 */
2390 2391 void
2391 2392 flushq(queue_t *q, int flag)
2392 2393 {
2393 2394 flushq_common(q, flag, 0);
2394 2395 }
2395 2396
2396 2397 /*
2397 2398 * Flush the queue of messages of the given priority band.
2398 2399 * There is some duplication of code between flushq and flushband.
2399 2400 * This is because we want to optimize the code as much as possible.
2400 2401 * The assumption is that there will be more messages in the normal
2401 2402 * (priority 0) band than in any other.
2402 2403 *
2403 2404 * Historical note: when merging the M_FLUSH code in strrput with this
2404 2405 * code one difference was discovered. flushband had an extra check for
2405 2406 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2406 2407 * case. That check does not match the man page for flushband and was not
2407 2408 * in the strrput flush code hence it was removed.
2408 2409 */
2409 2410 void
2410 2411 flushband(queue_t *q, unsigned char pri, int flag)
2411 2412 {
2412 2413 mblk_t *mp;
2413 2414 mblk_t *nmp;
2414 2415 mblk_t *last;
2415 2416 qband_t *qbp;
2416 2417 int band;
2417 2418
2418 2419 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2419 2420 if (pri > q->q_nband) {
2420 2421 return;
2421 2422 }
2422 2423 mutex_enter(QLOCK(q));
2423 2424 if (pri == 0) {
2424 2425 mp = q->q_first;
2425 2426 q->q_first = NULL;
2426 2427 q->q_last = NULL;
2427 2428 q->q_count = 0;
2428 2429 q->q_mblkcnt = 0;
2429 2430 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2430 2431 qbp->qb_first = NULL;
2431 2432 qbp->qb_last = NULL;
2432 2433 qbp->qb_count = 0;
2433 2434 qbp->qb_mblkcnt = 0;
2434 2435 qbp->qb_flag &= ~QB_FULL;
2435 2436 }
2436 2437 q->q_flag &= ~QFULL;
2437 2438 mutex_exit(QLOCK(q));
2438 2439 while (mp) {
2439 2440 nmp = mp->b_next;
2440 2441 mp->b_next = mp->b_prev = NULL;
2441 2442 if ((mp->b_band == 0) &&
2442 2443 ((flag == FLUSHALL) ||
2443 2444 datamsg(mp->b_datap->db_type)))
2444 2445 freemsg(mp);
2445 2446 else
2446 2447 (void) putq(q, mp);
2447 2448 mp = nmp;
2448 2449 }
2449 2450 mutex_enter(QLOCK(q));
2450 2451 if ((q->q_flag & QWANTW) &&
2451 2452 (((q->q_count < q->q_lowat) &&
2452 2453 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2453 2454 q->q_flag &= ~QWANTW;
2454 2455 mutex_exit(QLOCK(q));
2455 2456
2456 2457 backenable(q, pri);
2457 2458 } else
2458 2459 mutex_exit(QLOCK(q));
2459 2460 } else { /* pri != 0 */
2460 2461 boolean_t flushed = B_FALSE;
2461 2462 band = pri;
2462 2463
2463 2464 ASSERT(MUTEX_HELD(QLOCK(q)));
2464 2465 qbp = q->q_bandp;
2465 2466 while (--band > 0)
2466 2467 qbp = qbp->qb_next;
2467 2468 mp = qbp->qb_first;
2468 2469 if (mp == NULL) {
2469 2470 mutex_exit(QLOCK(q));
2470 2471 return;
2471 2472 }
2472 2473 last = qbp->qb_last->b_next;
2473 2474 /*
2474 2475 * rmvq_noenab() and freemsg() are called for each mblk that
2475 2476 * meets the criteria. The loop is executed until the last
2476 2477 * mblk has been processed.
2477 2478 */
2478 2479 while (mp != last) {
2479 2480 ASSERT(mp->b_band == pri);
2480 2481 nmp = mp->b_next;
2481 2482 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2482 2483 rmvq_noenab(q, mp);
2483 2484 freemsg(mp);
2484 2485 flushed = B_TRUE;
2485 2486 }
2486 2487 mp = nmp;
2487 2488 }
2488 2489 mutex_exit(QLOCK(q));
2489 2490
2490 2491 /*
2491 2492 * If any mblk(s) has been freed, we know that qbackenable()
2492 2493 * will need to be called.
2493 2494 */
2494 2495 if (flushed)
2495 2496 qbackenable(q, pri);
2496 2497 }
2497 2498 }
2498 2499
2499 2500 /*
2500 2501 * Return 1 if the queue is not full. If the queue is full, return
2501 2502 * 0 (may not put message) and set QWANTW flag (caller wants to write
2502 2503 * to the queue).
2503 2504 */
2504 2505 int
2505 2506 canput(queue_t *q)
2506 2507 {
2507 2508 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2508 2509
2509 2510 /* this is for loopback transports, they should not do a canput */
2510 2511 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2511 2512
2512 2513 /* Find next forward module that has a service procedure */
2513 2514 q = q->q_nfsrv;
2514 2515
2515 2516 if (!(q->q_flag & QFULL)) {
2516 2517 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2517 2518 return (1);
2518 2519 }
2519 2520 mutex_enter(QLOCK(q));
2520 2521 if (q->q_flag & QFULL) {
2521 2522 q->q_flag |= QWANTW;
2522 2523 mutex_exit(QLOCK(q));
2523 2524 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2524 2525 return (0);
2525 2526 }
2526 2527 mutex_exit(QLOCK(q));
2527 2528 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2528 2529 return (1);
2529 2530 }
2530 2531
2531 2532 /*
2532 2533 * This is the new canput for use with priority bands. Return 1 if the
2533 2534 * band is not full. If the band is full, return 0 (may not put message)
2534 2535 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2535 2536 * write to the queue).
2536 2537 */
2537 2538 int
2538 2539 bcanput(queue_t *q, unsigned char pri)
2539 2540 {
2540 2541 qband_t *qbp;
2541 2542
2542 2543 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2543 2544 if (!q)
2544 2545 return (0);
2545 2546
2546 2547 /* Find next forward module that has a service procedure */
2547 2548 q = q->q_nfsrv;
2548 2549
2549 2550 mutex_enter(QLOCK(q));
2550 2551 if (pri == 0) {
2551 2552 if (q->q_flag & QFULL) {
2552 2553 q->q_flag |= QWANTW;
2553 2554 mutex_exit(QLOCK(q));
2554 2555 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2555 2556 "bcanput:%p %X %d", q, pri, 0);
2556 2557 return (0);
2557 2558 }
2558 2559 } else { /* pri != 0 */
2559 2560 if (pri > q->q_nband) {
2560 2561 /*
2561 2562 * No band exists yet, so return success.
2562 2563 */
2563 2564 mutex_exit(QLOCK(q));
2564 2565 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2565 2566 "bcanput:%p %X %d", q, pri, 1);
2566 2567 return (1);
2567 2568 }
2568 2569 qbp = q->q_bandp;
2569 2570 while (--pri)
2570 2571 qbp = qbp->qb_next;
2571 2572 if (qbp->qb_flag & QB_FULL) {
2572 2573 qbp->qb_flag |= QB_WANTW;
2573 2574 mutex_exit(QLOCK(q));
2574 2575 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2575 2576 "bcanput:%p %X %d", q, pri, 0);
2576 2577 return (0);
2577 2578 }
2578 2579 }
2579 2580 mutex_exit(QLOCK(q));
2580 2581 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2581 2582 "bcanput:%p %X %d", q, pri, 1);
2582 2583 return (1);
2583 2584 }
2584 2585
2585 2586 /*
2586 2587 * Put a message on a queue.
2587 2588 *
2588 2589 * Messages are enqueued on a priority basis. The priority classes
2589 2590 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2590 2591 * and B_NORMAL (type < QPCTL && band == 0).
2591 2592 *
2592 2593 * Add appropriate weighted data block sizes to queue count.
2593 2594 * If queue hits high water mark then set QFULL flag.
2594 2595 *
2595 2596 * If QNOENAB is not set (putq is allowed to enable the queue),
2596 2597 * enable the queue only if the message is PRIORITY,
2597 2598 * or the QWANTR flag is set (indicating that the service procedure
2598 2599 * is ready to read the queue. This implies that a service
2599 2600 * procedure must NEVER put a high priority message back on its own
2600 2601 * queue, as this would result in an infinite loop (!).
2601 2602 */
2602 2603 int
2603 2604 putq(queue_t *q, mblk_t *bp)
2604 2605 {
2605 2606 mblk_t *tmp;
2606 2607 qband_t *qbp = NULL;
2607 2608 int mcls = (int)queclass(bp);
2608 2609 kthread_id_t freezer;
2609 2610 int bytecnt = 0, mblkcnt = 0;
2610 2611
2611 2612 freezer = STREAM(q)->sd_freezer;
2612 2613 if (freezer == curthread) {
2613 2614 ASSERT(frozenstr(q));
2614 2615 ASSERT(MUTEX_HELD(QLOCK(q)));
2615 2616 } else
2616 2617 mutex_enter(QLOCK(q));
2617 2618
2618 2619 /*
2619 2620 * Make sanity checks and if qband structure is not yet
2620 2621 * allocated, do so.
2621 2622 */
2622 2623 if (mcls == QPCTL) {
2623 2624 if (bp->b_band != 0)
2624 2625 bp->b_band = 0; /* force to be correct */
2625 2626 } else if (bp->b_band != 0) {
2626 2627 int i;
2627 2628 qband_t **qbpp;
2628 2629
2629 2630 if (bp->b_band > q->q_nband) {
2630 2631
2631 2632 /*
2632 2633 * The qband structure for this priority band is
2633 2634 * not on the queue yet, so we have to allocate
2634 2635 * one on the fly. It would be wasteful to
2635 2636 * associate the qband structures with every
2636 2637 * queue when the queues are allocated. This is
2637 2638 * because most queues will only need the normal
2638 2639 * band of flow which can be described entirely
2639 2640 * by the queue itself.
2640 2641 */
2641 2642 qbpp = &q->q_bandp;
2642 2643 while (*qbpp)
2643 2644 qbpp = &(*qbpp)->qb_next;
2644 2645 while (bp->b_band > q->q_nband) {
2645 2646 if ((*qbpp = allocband()) == NULL) {
2646 2647 if (freezer != curthread)
2647 2648 mutex_exit(QLOCK(q));
2648 2649 return (0);
2649 2650 }
2650 2651 (*qbpp)->qb_hiwat = q->q_hiwat;
2651 2652 (*qbpp)->qb_lowat = q->q_lowat;
2652 2653 q->q_nband++;
2653 2654 qbpp = &(*qbpp)->qb_next;
2654 2655 }
2655 2656 }
2656 2657 ASSERT(MUTEX_HELD(QLOCK(q)));
2657 2658 qbp = q->q_bandp;
2658 2659 i = bp->b_band;
2659 2660 while (--i)
2660 2661 qbp = qbp->qb_next;
2661 2662 }
2662 2663
2663 2664 /*
2664 2665 * If queue is empty, add the message and initialize the pointers.
2665 2666 * Otherwise, adjust message pointers and queue pointers based on
2666 2667 * the type of the message and where it belongs on the queue. Some
2667 2668 * code is duplicated to minimize the number of conditionals and
2668 2669 * hopefully minimize the amount of time this routine takes.
2669 2670 */
2670 2671 if (!q->q_first) {
2671 2672 bp->b_next = NULL;
2672 2673 bp->b_prev = NULL;
2673 2674 q->q_first = bp;
2674 2675 q->q_last = bp;
2675 2676 if (qbp) {
2676 2677 qbp->qb_first = bp;
2677 2678 qbp->qb_last = bp;
2678 2679 }
2679 2680 } else if (!qbp) { /* bp->b_band == 0 */
2680 2681
2681 2682 /*
2682 2683 * If queue class of message is less than or equal to
2683 2684 * that of the last one on the queue, tack on to the end.
2684 2685 */
2685 2686 tmp = q->q_last;
2686 2687 if (mcls <= (int)queclass(tmp)) {
2687 2688 bp->b_next = NULL;
2688 2689 bp->b_prev = tmp;
2689 2690 tmp->b_next = bp;
2690 2691 q->q_last = bp;
2691 2692 } else {
2692 2693 tmp = q->q_first;
2693 2694 while ((int)queclass(tmp) >= mcls)
2694 2695 tmp = tmp->b_next;
2695 2696
2696 2697 /*
2697 2698 * Insert bp before tmp.
2698 2699 */
2699 2700 bp->b_next = tmp;
2700 2701 bp->b_prev = tmp->b_prev;
2701 2702 if (tmp->b_prev)
2702 2703 tmp->b_prev->b_next = bp;
2703 2704 else
2704 2705 q->q_first = bp;
2705 2706 tmp->b_prev = bp;
2706 2707 }
2707 2708 } else { /* bp->b_band != 0 */
2708 2709 if (qbp->qb_first) {
2709 2710 tmp = qbp->qb_last;
2710 2711
2711 2712 /*
2712 2713 * Insert bp after the last message in this band.
2713 2714 */
2714 2715 bp->b_next = tmp->b_next;
2715 2716 if (tmp->b_next)
2716 2717 tmp->b_next->b_prev = bp;
2717 2718 else
2718 2719 q->q_last = bp;
2719 2720 bp->b_prev = tmp;
2720 2721 tmp->b_next = bp;
2721 2722 } else {
2722 2723 tmp = q->q_last;
2723 2724 if ((mcls < (int)queclass(tmp)) ||
2724 2725 (bp->b_band <= tmp->b_band)) {
2725 2726
2726 2727 /*
2727 2728 * Tack bp on end of queue.
2728 2729 */
2729 2730 bp->b_next = NULL;
2730 2731 bp->b_prev = tmp;
2731 2732 tmp->b_next = bp;
2732 2733 q->q_last = bp;
2733 2734 } else {
2734 2735 tmp = q->q_first;
2735 2736 while (tmp->b_datap->db_type >= QPCTL)
2736 2737 tmp = tmp->b_next;
2737 2738 while (tmp->b_band >= bp->b_band)
2738 2739 tmp = tmp->b_next;
2739 2740
2740 2741 /*
2741 2742 * Insert bp before tmp.
2742 2743 */
2743 2744 bp->b_next = tmp;
2744 2745 bp->b_prev = tmp->b_prev;
2745 2746 if (tmp->b_prev)
2746 2747 tmp->b_prev->b_next = bp;
2747 2748 else
2748 2749 q->q_first = bp;
2749 2750 tmp->b_prev = bp;
2750 2751 }
2751 2752 qbp->qb_first = bp;
2752 2753 }
2753 2754 qbp->qb_last = bp;
2754 2755 }
2755 2756
2756 2757 /* Get message byte count for q_count accounting */
2757 2758 bytecnt = mp_cont_len(bp, &mblkcnt);
2758 2759
2759 2760 if (qbp) {
2760 2761 qbp->qb_count += bytecnt;
2761 2762 qbp->qb_mblkcnt += mblkcnt;
2762 2763 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2763 2764 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2764 2765 qbp->qb_flag |= QB_FULL;
2765 2766 }
2766 2767 } else {
2767 2768 q->q_count += bytecnt;
2768 2769 q->q_mblkcnt += mblkcnt;
2769 2770 if ((q->q_count >= q->q_hiwat) ||
2770 2771 (q->q_mblkcnt >= q->q_hiwat)) {
2771 2772 q->q_flag |= QFULL;
2772 2773 }
2773 2774 }
2774 2775
2775 2776 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2776 2777
2777 2778 if ((mcls > QNORM) ||
2778 2779 (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2779 2780 qenable_locked(q);
2780 2781 ASSERT(MUTEX_HELD(QLOCK(q)));
2781 2782 if (freezer != curthread)
2782 2783 mutex_exit(QLOCK(q));
2783 2784
2784 2785 return (1);
2785 2786 }
2786 2787
2787 2788 /*
2788 2789 * Put stuff back at beginning of Q according to priority order.
2789 2790 * See comment on putq above for details.
2790 2791 */
2791 2792 int
2792 2793 putbq(queue_t *q, mblk_t *bp)
2793 2794 {
2794 2795 mblk_t *tmp;
2795 2796 qband_t *qbp = NULL;
2796 2797 int mcls = (int)queclass(bp);
2797 2798 kthread_id_t freezer;
2798 2799 int bytecnt = 0, mblkcnt = 0;
2799 2800
2800 2801 ASSERT(q && bp);
2801 2802 ASSERT(bp->b_next == NULL);
2802 2803 freezer = STREAM(q)->sd_freezer;
2803 2804 if (freezer == curthread) {
2804 2805 ASSERT(frozenstr(q));
2805 2806 ASSERT(MUTEX_HELD(QLOCK(q)));
2806 2807 } else
2807 2808 mutex_enter(QLOCK(q));
2808 2809
2809 2810 /*
2810 2811 * Make sanity checks and if qband structure is not yet
2811 2812 * allocated, do so.
2812 2813 */
2813 2814 if (mcls == QPCTL) {
2814 2815 if (bp->b_band != 0)
2815 2816 bp->b_band = 0; /* force to be correct */
2816 2817 } else if (bp->b_band != 0) {
2817 2818 int i;
2818 2819 qband_t **qbpp;
2819 2820
2820 2821 if (bp->b_band > q->q_nband) {
2821 2822 qbpp = &q->q_bandp;
2822 2823 while (*qbpp)
2823 2824 qbpp = &(*qbpp)->qb_next;
2824 2825 while (bp->b_band > q->q_nband) {
2825 2826 if ((*qbpp = allocband()) == NULL) {
2826 2827 if (freezer != curthread)
2827 2828 mutex_exit(QLOCK(q));
2828 2829 return (0);
2829 2830 }
2830 2831 (*qbpp)->qb_hiwat = q->q_hiwat;
2831 2832 (*qbpp)->qb_lowat = q->q_lowat;
2832 2833 q->q_nband++;
2833 2834 qbpp = &(*qbpp)->qb_next;
2834 2835 }
2835 2836 }
2836 2837 qbp = q->q_bandp;
2837 2838 i = bp->b_band;
2838 2839 while (--i)
2839 2840 qbp = qbp->qb_next;
2840 2841 }
2841 2842
2842 2843 /*
2843 2844 * If queue is empty or if message is high priority,
2844 2845 * place on the front of the queue.
2845 2846 */
2846 2847 tmp = q->q_first;
2847 2848 if ((!tmp) || (mcls == QPCTL)) {
2848 2849 bp->b_next = tmp;
2849 2850 if (tmp)
2850 2851 tmp->b_prev = bp;
2851 2852 else
2852 2853 q->q_last = bp;
2853 2854 q->q_first = bp;
2854 2855 bp->b_prev = NULL;
2855 2856 if (qbp) {
2856 2857 qbp->qb_first = bp;
2857 2858 qbp->qb_last = bp;
2858 2859 }
2859 2860 } else if (qbp) { /* bp->b_band != 0 */
2860 2861 tmp = qbp->qb_first;
2861 2862 if (tmp) {
2862 2863
2863 2864 /*
2864 2865 * Insert bp before the first message in this band.
2865 2866 */
2866 2867 bp->b_next = tmp;
2867 2868 bp->b_prev = tmp->b_prev;
2868 2869 if (tmp->b_prev)
2869 2870 tmp->b_prev->b_next = bp;
2870 2871 else
2871 2872 q->q_first = bp;
2872 2873 tmp->b_prev = bp;
2873 2874 } else {
2874 2875 tmp = q->q_last;
2875 2876 if ((mcls < (int)queclass(tmp)) ||
2876 2877 (bp->b_band < tmp->b_band)) {
2877 2878
2878 2879 /*
2879 2880 * Tack bp on end of queue.
2880 2881 */
2881 2882 bp->b_next = NULL;
2882 2883 bp->b_prev = tmp;
2883 2884 tmp->b_next = bp;
2884 2885 q->q_last = bp;
2885 2886 } else {
2886 2887 tmp = q->q_first;
2887 2888 while (tmp->b_datap->db_type >= QPCTL)
2888 2889 tmp = tmp->b_next;
2889 2890 while (tmp->b_band > bp->b_band)
2890 2891 tmp = tmp->b_next;
2891 2892
2892 2893 /*
2893 2894 * Insert bp before tmp.
2894 2895 */
2895 2896 bp->b_next = tmp;
2896 2897 bp->b_prev = tmp->b_prev;
2897 2898 if (tmp->b_prev)
2898 2899 tmp->b_prev->b_next = bp;
2899 2900 else
2900 2901 q->q_first = bp;
2901 2902 tmp->b_prev = bp;
2902 2903 }
2903 2904 qbp->qb_last = bp;
2904 2905 }
2905 2906 qbp->qb_first = bp;
2906 2907 } else { /* bp->b_band == 0 && !QPCTL */
2907 2908
2908 2909 /*
2909 2910 * If the queue class or band is less than that of the last
2910 2911 * message on the queue, tack bp on the end of the queue.
2911 2912 */
2912 2913 tmp = q->q_last;
2913 2914 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2914 2915 bp->b_next = NULL;
2915 2916 bp->b_prev = tmp;
2916 2917 tmp->b_next = bp;
2917 2918 q->q_last = bp;
2918 2919 } else {
2919 2920 tmp = q->q_first;
2920 2921 while (tmp->b_datap->db_type >= QPCTL)
2921 2922 tmp = tmp->b_next;
2922 2923 while (tmp->b_band > bp->b_band)
2923 2924 tmp = tmp->b_next;
2924 2925
2925 2926 /*
2926 2927 * Insert bp before tmp.
2927 2928 */
2928 2929 bp->b_next = tmp;
2929 2930 bp->b_prev = tmp->b_prev;
2930 2931 if (tmp->b_prev)
2931 2932 tmp->b_prev->b_next = bp;
2932 2933 else
2933 2934 q->q_first = bp;
2934 2935 tmp->b_prev = bp;
2935 2936 }
2936 2937 }
2937 2938
2938 2939 /* Get message byte count for q_count accounting */
2939 2940 bytecnt = mp_cont_len(bp, &mblkcnt);
2940 2941
2941 2942 if (qbp) {
2942 2943 qbp->qb_count += bytecnt;
2943 2944 qbp->qb_mblkcnt += mblkcnt;
2944 2945 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2945 2946 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2946 2947 qbp->qb_flag |= QB_FULL;
2947 2948 }
2948 2949 } else {
2949 2950 q->q_count += bytecnt;
2950 2951 q->q_mblkcnt += mblkcnt;
2951 2952 if ((q->q_count >= q->q_hiwat) ||
2952 2953 (q->q_mblkcnt >= q->q_hiwat)) {
2953 2954 q->q_flag |= QFULL;
2954 2955 }
2955 2956 }
2956 2957
2957 2958 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2958 2959
2959 2960 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2960 2961 qenable_locked(q);
2961 2962 ASSERT(MUTEX_HELD(QLOCK(q)));
2962 2963 if (freezer != curthread)
2963 2964 mutex_exit(QLOCK(q));
2964 2965
2965 2966 return (1);
2966 2967 }
2967 2968
2968 2969 /*
2969 2970 * Insert a message before an existing message on the queue. If the
2970 2971 * existing message is NULL, the new messages is placed on the end of
2971 2972 * the queue. The queue class of the new message is ignored. However,
2972 2973 * the priority band of the new message must adhere to the following
2973 2974 * ordering:
2974 2975 *
2975 2976 * emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2976 2977 *
2977 2978 * All flow control parameters are updated.
2978 2979 *
2979 2980 * insq can be called with the stream frozen, but other utility functions
2980 2981 * holding QLOCK, and by streams modules without any locks/frozen.
2981 2982 */
2982 2983 int
2983 2984 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2984 2985 {
2985 2986 mblk_t *tmp;
2986 2987 qband_t *qbp = NULL;
2987 2988 int mcls = (int)queclass(mp);
2988 2989 kthread_id_t freezer;
2989 2990 int bytecnt = 0, mblkcnt = 0;
2990 2991
2991 2992 freezer = STREAM(q)->sd_freezer;
2992 2993 if (freezer == curthread) {
2993 2994 ASSERT(frozenstr(q));
2994 2995 ASSERT(MUTEX_HELD(QLOCK(q)));
2995 2996 } else if (MUTEX_HELD(QLOCK(q))) {
2996 2997 /* Don't drop lock on exit */
2997 2998 freezer = curthread;
2998 2999 } else
2999 3000 mutex_enter(QLOCK(q));
3000 3001
3001 3002 if (mcls == QPCTL) {
3002 3003 if (mp->b_band != 0)
3003 3004 mp->b_band = 0; /* force to be correct */
3004 3005 if (emp && emp->b_prev &&
3005 3006 (emp->b_prev->b_datap->db_type < QPCTL))
3006 3007 goto badord;
3007 3008 }
3008 3009 if (emp) {
3009 3010 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3010 3011 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3011 3012 (emp->b_prev->b_band < mp->b_band))) {
3012 3013 goto badord;
3013 3014 }
3014 3015 } else {
3015 3016 tmp = q->q_last;
3016 3017 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3017 3018 badord:
3018 3019 cmn_err(CE_WARN,
3019 3020 "insq: attempt to insert message out of order "
3020 3021 "on q %p", (void *)q);
3021 3022 if (freezer != curthread)
3022 3023 mutex_exit(QLOCK(q));
3023 3024 return (0);
3024 3025 }
3025 3026 }
3026 3027
3027 3028 if (mp->b_band != 0) {
3028 3029 int i;
3029 3030 qband_t **qbpp;
3030 3031
3031 3032 if (mp->b_band > q->q_nband) {
3032 3033 qbpp = &q->q_bandp;
3033 3034 while (*qbpp)
3034 3035 qbpp = &(*qbpp)->qb_next;
3035 3036 while (mp->b_band > q->q_nband) {
3036 3037 if ((*qbpp = allocband()) == NULL) {
3037 3038 if (freezer != curthread)
3038 3039 mutex_exit(QLOCK(q));
3039 3040 return (0);
3040 3041 }
3041 3042 (*qbpp)->qb_hiwat = q->q_hiwat;
3042 3043 (*qbpp)->qb_lowat = q->q_lowat;
3043 3044 q->q_nband++;
3044 3045 qbpp = &(*qbpp)->qb_next;
3045 3046 }
3046 3047 }
3047 3048 qbp = q->q_bandp;
3048 3049 i = mp->b_band;
3049 3050 while (--i)
3050 3051 qbp = qbp->qb_next;
3051 3052 }
3052 3053
3053 3054 if ((mp->b_next = emp) != NULL) {
3054 3055 if ((mp->b_prev = emp->b_prev) != NULL)
3055 3056 emp->b_prev->b_next = mp;
3056 3057 else
3057 3058 q->q_first = mp;
3058 3059 emp->b_prev = mp;
3059 3060 } else {
3060 3061 if ((mp->b_prev = q->q_last) != NULL)
3061 3062 q->q_last->b_next = mp;
3062 3063 else
3063 3064 q->q_first = mp;
3064 3065 q->q_last = mp;
3065 3066 }
3066 3067
3067 3068 /* Get mblk and byte count for q_count accounting */
3068 3069 bytecnt = mp_cont_len(mp, &mblkcnt);
3069 3070
3070 3071 if (qbp) { /* adjust qband pointers and count */
3071 3072 if (!qbp->qb_first) {
3072 3073 qbp->qb_first = mp;
3073 3074 qbp->qb_last = mp;
3074 3075 } else {
3075 3076 if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3076 3077 (mp->b_prev->b_band != mp->b_band)))
3077 3078 qbp->qb_first = mp;
3078 3079 else if (mp->b_next == NULL || (mp->b_next != NULL &&
3079 3080 (mp->b_next->b_band != mp->b_band)))
3080 3081 qbp->qb_last = mp;
3081 3082 }
3082 3083 qbp->qb_count += bytecnt;
3083 3084 qbp->qb_mblkcnt += mblkcnt;
3084 3085 if ((qbp->qb_count >= qbp->qb_hiwat) ||
3085 3086 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3086 3087 qbp->qb_flag |= QB_FULL;
3087 3088 }
3088 3089 } else {
3089 3090 q->q_count += bytecnt;
3090 3091 q->q_mblkcnt += mblkcnt;
3091 3092 if ((q->q_count >= q->q_hiwat) ||
3092 3093 (q->q_mblkcnt >= q->q_hiwat)) {
3093 3094 q->q_flag |= QFULL;
3094 3095 }
3095 3096 }
3096 3097
3097 3098 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
3098 3099
3099 3100 if (canenable(q) && (q->q_flag & QWANTR))
3100 3101 qenable_locked(q);
3101 3102
3102 3103 ASSERT(MUTEX_HELD(QLOCK(q)));
3103 3104 if (freezer != curthread)
3104 3105 mutex_exit(QLOCK(q));
3105 3106
3106 3107 return (1);
3107 3108 }
3108 3109
3109 3110 /*
3110 3111 * Create and put a control message on queue.
3111 3112 */
3112 3113 int
3113 3114 putctl(queue_t *q, int type)
3114 3115 {
3115 3116 mblk_t *bp;
3116 3117
3117 3118 if ((datamsg(type) && (type != M_DELAY)) ||
3118 3119 (bp = allocb_tryhard(0)) == NULL)
3119 3120 return (0);
3120 3121 bp->b_datap->db_type = (unsigned char) type;
3121 3122
3122 3123 put(q, bp);
3123 3124
3124 3125 return (1);
3125 3126 }
3126 3127
3127 3128 /*
3128 3129 * Control message with a single-byte parameter
3129 3130 */
3130 3131 int
3131 3132 putctl1(queue_t *q, int type, int param)
3132 3133 {
3133 3134 mblk_t *bp;
3134 3135
3135 3136 if ((datamsg(type) && (type != M_DELAY)) ||
3136 3137 (bp = allocb_tryhard(1)) == NULL)
3137 3138 return (0);
3138 3139 bp->b_datap->db_type = (unsigned char)type;
3139 3140 *bp->b_wptr++ = (unsigned char)param;
3140 3141
3141 3142 put(q, bp);
3142 3143
3143 3144 return (1);
3144 3145 }
3145 3146
3146 3147 int
3147 3148 putnextctl1(queue_t *q, int type, int param)
3148 3149 {
3149 3150 mblk_t *bp;
3150 3151
3151 3152 if ((datamsg(type) && (type != M_DELAY)) ||
3152 3153 ((bp = allocb_tryhard(1)) == NULL))
3153 3154 return (0);
3154 3155
3155 3156 bp->b_datap->db_type = (unsigned char)type;
3156 3157 *bp->b_wptr++ = (unsigned char)param;
3157 3158
3158 3159 putnext(q, bp);
3159 3160
3160 3161 return (1);
3161 3162 }
3162 3163
3163 3164 int
3164 3165 putnextctl(queue_t *q, int type)
3165 3166 {
3166 3167 mblk_t *bp;
3167 3168
3168 3169 if ((datamsg(type) && (type != M_DELAY)) ||
3169 3170 ((bp = allocb_tryhard(0)) == NULL))
3170 3171 return (0);
3171 3172 bp->b_datap->db_type = (unsigned char)type;
3172 3173
3173 3174 putnext(q, bp);
3174 3175
3175 3176 return (1);
3176 3177 }
3177 3178
3178 3179 /*
3179 3180 * Return the queue upstream from this one
3180 3181 */
3181 3182 queue_t *
3182 3183 backq(queue_t *q)
3183 3184 {
3184 3185 q = _OTHERQ(q);
3185 3186 if (q->q_next) {
3186 3187 q = q->q_next;
3187 3188 return (_OTHERQ(q));
3188 3189 }
3189 3190 return (NULL);
3190 3191 }
3191 3192
3192 3193 /*
3193 3194 * Send a block back up the queue in reverse from this
3194 3195 * one (e.g. to respond to ioctls)
3195 3196 */
3196 3197 void
3197 3198 qreply(queue_t *q, mblk_t *bp)
3198 3199 {
3199 3200 ASSERT(q && bp);
3200 3201
3201 3202 putnext(_OTHERQ(q), bp);
3202 3203 }
3203 3204
3204 3205 /*
3205 3206 * Streams Queue Scheduling
3206 3207 *
3207 3208 * Queues are enabled through qenable() when they have messages to
3208 3209 * process. They are serviced by queuerun(), which runs each enabled
3209 3210 * queue's service procedure. The call to queuerun() is processor
3210 3211 * dependent - the general principle is that it be run whenever a queue
3211 3212 * is enabled but before returning to user level. For system calls,
3212 3213 * the function runqueues() is called if their action causes a queue
3213 3214 * to be enabled. For device interrupts, queuerun() should be
3214 3215 * called before returning from the last level of interrupt. Beyond
3215 3216 * this, no timing assumptions should be made about queue scheduling.
3216 3217 */
3217 3218
3218 3219 /*
3219 3220 * Enable a queue: put it on list of those whose service procedures are
3220 3221 * ready to run and set up the scheduling mechanism.
3221 3222 * The broadcast is done outside the mutex -> to avoid the woken thread
3222 3223 * from contending with the mutex. This is OK 'cos the queue has been
3223 3224 * enqueued on the runlist and flagged safely at this point.
3224 3225 */
3225 3226 void
3226 3227 qenable(queue_t *q)
3227 3228 {
3228 3229 mutex_enter(QLOCK(q));
3229 3230 qenable_locked(q);
3230 3231 mutex_exit(QLOCK(q));
3231 3232 }
3232 3233 /*
3233 3234 * Return number of messages on queue
3234 3235 */
3235 3236 int
3236 3237 qsize(queue_t *qp)
3237 3238 {
3238 3239 int count = 0;
3239 3240 mblk_t *mp;
3240 3241
3241 3242 mutex_enter(QLOCK(qp));
3242 3243 for (mp = qp->q_first; mp; mp = mp->b_next)
3243 3244 count++;
3244 3245 mutex_exit(QLOCK(qp));
3245 3246 return (count);
3246 3247 }
3247 3248
3248 3249 /*
3249 3250 * noenable - set queue so that putq() will not enable it.
3250 3251 * enableok - set queue so that putq() can enable it.
3251 3252 */
3252 3253 void
3253 3254 noenable(queue_t *q)
3254 3255 {
3255 3256 mutex_enter(QLOCK(q));
3256 3257 q->q_flag |= QNOENB;
3257 3258 mutex_exit(QLOCK(q));
3258 3259 }
3259 3260
3260 3261 void
3261 3262 enableok(queue_t *q)
3262 3263 {
3263 3264 mutex_enter(QLOCK(q));
3264 3265 q->q_flag &= ~QNOENB;
3265 3266 mutex_exit(QLOCK(q));
3266 3267 }
3267 3268
3268 3269 /*
3269 3270 * Set queue fields.
3270 3271 */
3271 3272 int
3272 3273 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3273 3274 {
3274 3275 qband_t *qbp = NULL;
3275 3276 queue_t *wrq;
3276 3277 int error = 0;
3277 3278 kthread_id_t freezer;
3278 3279
3279 3280 freezer = STREAM(q)->sd_freezer;
3280 3281 if (freezer == curthread) {
3281 3282 ASSERT(frozenstr(q));
3282 3283 ASSERT(MUTEX_HELD(QLOCK(q)));
3283 3284 } else
3284 3285 mutex_enter(QLOCK(q));
3285 3286
3286 3287 if (what >= QBAD) {
3287 3288 error = EINVAL;
3288 3289 goto done;
3289 3290 }
3290 3291 if (pri != 0) {
3291 3292 int i;
3292 3293 qband_t **qbpp;
3293 3294
3294 3295 if (pri > q->q_nband) {
3295 3296 qbpp = &q->q_bandp;
3296 3297 while (*qbpp)
3297 3298 qbpp = &(*qbpp)->qb_next;
3298 3299 while (pri > q->q_nband) {
3299 3300 if ((*qbpp = allocband()) == NULL) {
3300 3301 error = EAGAIN;
3301 3302 goto done;
3302 3303 }
3303 3304 (*qbpp)->qb_hiwat = q->q_hiwat;
3304 3305 (*qbpp)->qb_lowat = q->q_lowat;
3305 3306 q->q_nband++;
3306 3307 qbpp = &(*qbpp)->qb_next;
3307 3308 }
3308 3309 }
3309 3310 qbp = q->q_bandp;
3310 3311 i = pri;
3311 3312 while (--i)
3312 3313 qbp = qbp->qb_next;
3313 3314 }
3314 3315 switch (what) {
3315 3316
3316 3317 case QHIWAT:
3317 3318 if (qbp)
3318 3319 qbp->qb_hiwat = (size_t)val;
3319 3320 else
3320 3321 q->q_hiwat = (size_t)val;
3321 3322 break;
3322 3323
3323 3324 case QLOWAT:
3324 3325 if (qbp)
3325 3326 qbp->qb_lowat = (size_t)val;
3326 3327 else
3327 3328 q->q_lowat = (size_t)val;
3328 3329 break;
3329 3330
3330 3331 case QMAXPSZ:
3331 3332 if (qbp)
3332 3333 error = EINVAL;
3333 3334 else
3334 3335 q->q_maxpsz = (ssize_t)val;
3335 3336
3336 3337 /*
3337 3338 * Performance concern, strwrite looks at the module below
3338 3339 * the stream head for the maxpsz each time it does a write
3339 3340 * we now cache it at the stream head. Check to see if this
3340 3341 * queue is sitting directly below the stream head.
3341 3342 */
3342 3343 wrq = STREAM(q)->sd_wrq;
3343 3344 if (q != wrq->q_next)
3344 3345 break;
3345 3346
3346 3347 /*
3347 3348 * If the stream is not frozen drop the current QLOCK and
3348 3349 * acquire the sd_wrq QLOCK which protects sd_qn_*
3349 3350 */
3350 3351 if (freezer != curthread) {
3351 3352 mutex_exit(QLOCK(q));
3352 3353 mutex_enter(QLOCK(wrq));
3353 3354 }
3354 3355 ASSERT(MUTEX_HELD(QLOCK(wrq)));
3355 3356
3356 3357 if (strmsgsz != 0) {
3357 3358 if (val == INFPSZ)
3358 3359 val = strmsgsz;
3359 3360 else {
3360 3361 if (STREAM(q)->sd_vnode->v_type == VFIFO)
3361 3362 val = MIN(PIPE_BUF, val);
3362 3363 else
3363 3364 val = MIN(strmsgsz, val);
3364 3365 }
3365 3366 }
3366 3367 STREAM(q)->sd_qn_maxpsz = val;
3367 3368 if (freezer != curthread) {
3368 3369 mutex_exit(QLOCK(wrq));
3369 3370 mutex_enter(QLOCK(q));
3370 3371 }
3371 3372 break;
3372 3373
3373 3374 case QMINPSZ:
3374 3375 if (qbp)
3375 3376 error = EINVAL;
3376 3377 else
3377 3378 q->q_minpsz = (ssize_t)val;
3378 3379
3379 3380 /*
3380 3381 * Performance concern, strwrite looks at the module below
3381 3382 * the stream head for the maxpsz each time it does a write
3382 3383 * we now cache it at the stream head. Check to see if this
3383 3384 * queue is sitting directly below the stream head.
3384 3385 */
3385 3386 wrq = STREAM(q)->sd_wrq;
3386 3387 if (q != wrq->q_next)
3387 3388 break;
3388 3389
3389 3390 /*
3390 3391 * If the stream is not frozen drop the current QLOCK and
3391 3392 * acquire the sd_wrq QLOCK which protects sd_qn_*
3392 3393 */
3393 3394 if (freezer != curthread) {
3394 3395 mutex_exit(QLOCK(q));
3395 3396 mutex_enter(QLOCK(wrq));
3396 3397 }
3397 3398 STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3398 3399
3399 3400 if (freezer != curthread) {
3400 3401 mutex_exit(QLOCK(wrq));
3401 3402 mutex_enter(QLOCK(q));
3402 3403 }
3403 3404 break;
3404 3405
3405 3406 case QSTRUIOT:
3406 3407 if (qbp)
3407 3408 error = EINVAL;
3408 3409 else
3409 3410 q->q_struiot = (ushort_t)val;
3410 3411 break;
3411 3412
3412 3413 case QCOUNT:
3413 3414 case QFIRST:
3414 3415 case QLAST:
3415 3416 case QFLAG:
3416 3417 error = EPERM;
3417 3418 break;
3418 3419
3419 3420 default:
3420 3421 error = EINVAL;
3421 3422 break;
3422 3423 }
3423 3424 done:
3424 3425 if (freezer != curthread)
3425 3426 mutex_exit(QLOCK(q));
3426 3427 return (error);
3427 3428 }
3428 3429
3429 3430 /*
3430 3431 * Get queue fields.
3431 3432 */
3432 3433 int
3433 3434 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3434 3435 {
3435 3436 qband_t *qbp = NULL;
3436 3437 int error = 0;
3437 3438 kthread_id_t freezer;
3438 3439
3439 3440 freezer = STREAM(q)->sd_freezer;
3440 3441 if (freezer == curthread) {
3441 3442 ASSERT(frozenstr(q));
3442 3443 ASSERT(MUTEX_HELD(QLOCK(q)));
3443 3444 } else
3444 3445 mutex_enter(QLOCK(q));
3445 3446 if (what >= QBAD) {
3446 3447 error = EINVAL;
3447 3448 goto done;
3448 3449 }
3449 3450 if (pri != 0) {
3450 3451 int i;
3451 3452 qband_t **qbpp;
3452 3453
3453 3454 if (pri > q->q_nband) {
3454 3455 qbpp = &q->q_bandp;
3455 3456 while (*qbpp)
3456 3457 qbpp = &(*qbpp)->qb_next;
3457 3458 while (pri > q->q_nband) {
3458 3459 if ((*qbpp = allocband()) == NULL) {
3459 3460 error = EAGAIN;
3460 3461 goto done;
3461 3462 }
3462 3463 (*qbpp)->qb_hiwat = q->q_hiwat;
3463 3464 (*qbpp)->qb_lowat = q->q_lowat;
3464 3465 q->q_nband++;
3465 3466 qbpp = &(*qbpp)->qb_next;
3466 3467 }
3467 3468 }
3468 3469 qbp = q->q_bandp;
3469 3470 i = pri;
3470 3471 while (--i)
3471 3472 qbp = qbp->qb_next;
3472 3473 }
3473 3474 switch (what) {
3474 3475 case QHIWAT:
3475 3476 if (qbp)
3476 3477 *(size_t *)valp = qbp->qb_hiwat;
3477 3478 else
3478 3479 *(size_t *)valp = q->q_hiwat;
3479 3480 break;
3480 3481
3481 3482 case QLOWAT:
3482 3483 if (qbp)
3483 3484 *(size_t *)valp = qbp->qb_lowat;
3484 3485 else
3485 3486 *(size_t *)valp = q->q_lowat;
3486 3487 break;
3487 3488
3488 3489 case QMAXPSZ:
3489 3490 if (qbp)
3490 3491 error = EINVAL;
3491 3492 else
3492 3493 *(ssize_t *)valp = q->q_maxpsz;
3493 3494 break;
3494 3495
3495 3496 case QMINPSZ:
3496 3497 if (qbp)
3497 3498 error = EINVAL;
3498 3499 else
3499 3500 *(ssize_t *)valp = q->q_minpsz;
3500 3501 break;
3501 3502
3502 3503 case QCOUNT:
3503 3504 if (qbp)
3504 3505 *(size_t *)valp = qbp->qb_count;
3505 3506 else
3506 3507 *(size_t *)valp = q->q_count;
3507 3508 break;
3508 3509
3509 3510 case QFIRST:
3510 3511 if (qbp)
3511 3512 *(mblk_t **)valp = qbp->qb_first;
3512 3513 else
3513 3514 *(mblk_t **)valp = q->q_first;
3514 3515 break;
3515 3516
3516 3517 case QLAST:
3517 3518 if (qbp)
3518 3519 *(mblk_t **)valp = qbp->qb_last;
3519 3520 else
3520 3521 *(mblk_t **)valp = q->q_last;
3521 3522 break;
3522 3523
3523 3524 case QFLAG:
3524 3525 if (qbp)
3525 3526 *(uint_t *)valp = qbp->qb_flag;
3526 3527 else
3527 3528 *(uint_t *)valp = q->q_flag;
3528 3529 break;
3529 3530
3530 3531 case QSTRUIOT:
3531 3532 if (qbp)
3532 3533 error = EINVAL;
3533 3534 else
3534 3535 *(short *)valp = q->q_struiot;
3535 3536 break;
3536 3537
3537 3538 default:
3538 3539 error = EINVAL;
3539 3540 break;
3540 3541 }
3541 3542 done:
3542 3543 if (freezer != curthread)
3543 3544 mutex_exit(QLOCK(q));
3544 3545 return (error);
3545 3546 }
3546 3547
3547 3548 /*
3548 3549 * Function awakes all in cvwait/sigwait/pollwait, on one of:
3549 3550 * QWANTWSYNC or QWANTR or QWANTW,
3550 3551 *
3551 3552 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3552 3553 * deferred wakeup will be done. Also if strpoll() in progress then a
3553 3554 * deferred pollwakeup will be done.
3554 3555 */
3555 3556 void
3556 3557 strwakeq(queue_t *q, int flag)
3557 3558 {
3558 3559 stdata_t *stp = STREAM(q);
3559 3560 pollhead_t *pl;
3560 3561
3561 3562 mutex_enter(&stp->sd_lock);
3562 3563 pl = &stp->sd_pollist;
3563 3564 if (flag & QWANTWSYNC) {
3564 3565 ASSERT(!(q->q_flag & QREADR));
3565 3566 if (stp->sd_flag & WSLEEP) {
3566 3567 stp->sd_flag &= ~WSLEEP;
3567 3568 cv_broadcast(&stp->sd_wrq->q_wait);
3568 3569 } else {
3569 3570 stp->sd_wakeq |= WSLEEP;
3570 3571 }
3571 3572
3572 3573 mutex_exit(&stp->sd_lock);
3573 3574 pollwakeup(pl, POLLWRNORM);
3574 3575 mutex_enter(&stp->sd_lock);
3575 3576
3576 3577 if (stp->sd_sigflags & S_WRNORM)
3577 3578 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3578 3579 } else if (flag & QWANTR) {
3579 3580 if (stp->sd_flag & RSLEEP) {
3580 3581 stp->sd_flag &= ~RSLEEP;
3581 3582 cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3582 3583 } else {
3583 3584 stp->sd_wakeq |= RSLEEP;
3584 3585 }
3585 3586
3586 3587 mutex_exit(&stp->sd_lock);
3587 3588 pollwakeup(pl, POLLIN | POLLRDNORM);
3588 3589 mutex_enter(&stp->sd_lock);
3589 3590
3590 3591 {
3591 3592 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3592 3593
3593 3594 if (events)
3594 3595 strsendsig(stp->sd_siglist, events, 0, 0);
3595 3596 }
3596 3597 } else {
3597 3598 if (stp->sd_flag & WSLEEP) {
3598 3599 stp->sd_flag &= ~WSLEEP;
3599 3600 cv_broadcast(&stp->sd_wrq->q_wait);
3600 3601 }
3601 3602
3602 3603 mutex_exit(&stp->sd_lock);
3603 3604 pollwakeup(pl, POLLWRNORM);
3604 3605 mutex_enter(&stp->sd_lock);
3605 3606
3606 3607 if (stp->sd_sigflags & S_WRNORM)
3607 3608 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3608 3609 }
3609 3610 mutex_exit(&stp->sd_lock);
3610 3611 }
3611 3612
3612 3613 int
3613 3614 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3614 3615 {
3615 3616 stdata_t *stp = STREAM(q);
3616 3617 int typ = STRUIOT_STANDARD;
3617 3618 uio_t *uiop = &dp->d_uio;
3618 3619 dblk_t *dbp;
3619 3620 ssize_t uiocnt;
3620 3621 ssize_t cnt;
3621 3622 unsigned char *ptr;
3622 3623 ssize_t resid;
3623 3624 int error = 0;
3624 3625 on_trap_data_t otd;
3625 3626 queue_t *stwrq;
3626 3627
3627 3628 /*
3628 3629 * Plumbing may change while taking the type so store the
3629 3630 * queue in a temporary variable. It doesn't matter even
3630 3631 * if the we take the type from the previous plumbing,
3631 3632 * that's because if the plumbing has changed when we were
3632 3633 * holding the queue in a temporary variable, we can continue
3633 3634 * processing the message the way it would have been processed
3634 3635 * in the old plumbing, without any side effects but a bit
3635 3636 * extra processing for partial ip header checksum.
3636 3637 *
3637 3638 * This has been done to avoid holding the sd_lock which is
3638 3639 * very hot.
3639 3640 */
3640 3641
3641 3642 stwrq = stp->sd_struiowrq;
3642 3643 if (stwrq)
3643 3644 typ = stwrq->q_struiot;
3644 3645
3645 3646 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3646 3647 dbp = mp->b_datap;
3647 3648 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3648 3649 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3649 3650 cnt = MIN(uiocnt, uiop->uio_resid);
3650 3651 if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3651 3652 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3652 3653 /*
3653 3654 * Either this mblk has already been processed
3654 3655 * or there is no more room in this mblk (?).
3655 3656 */
3656 3657 continue;
3657 3658 }
3658 3659 switch (typ) {
3659 3660 case STRUIOT_STANDARD:
3660 3661 if (noblock) {
3661 3662 if (on_trap(&otd, OT_DATA_ACCESS)) {
3662 3663 no_trap();
3663 3664 error = EWOULDBLOCK;
3664 3665 goto out;
3665 3666 }
3666 3667 }
3667 3668 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3668 3669 if (noblock)
3669 3670 no_trap();
3670 3671 goto out;
3671 3672 }
3672 3673 if (noblock)
3673 3674 no_trap();
3674 3675 break;
3675 3676
3676 3677 default:
3677 3678 error = EIO;
3678 3679 goto out;
3679 3680 }
3680 3681 dbp->db_struioflag |= STRUIO_DONE;
3681 3682 dbp->db_cksumstuff += cnt;
3682 3683 }
3683 3684 out:
3684 3685 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3685 3686 /*
3686 3687 * A fault has occured and some bytes were moved to the
3687 3688 * current mblk, the uio_t has already been updated by
3688 3689 * the appropriate uio routine, so also update the mblk
3689 3690 * to reflect this in case this same mblk chain is used
3690 3691 * again (after the fault has been handled).
3691 3692 */
3692 3693 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3693 3694 if (uiocnt >= resid)
3694 3695 dbp->db_cksumstuff += resid;
3695 3696 }
3696 3697 return (error);
3697 3698 }
3698 3699
3699 3700 /*
3700 3701 * Try to enter queue synchronously. Any attempt to enter a closing queue will
3701 3702 * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3702 3703 * that removeq() will not try to close the queue while a thread is inside the
3703 3704 * queue.
3704 3705 */
3705 3706 static boolean_t
3706 3707 rwnext_enter(queue_t *qp)
3707 3708 {
3708 3709 mutex_enter(QLOCK(qp));
3709 3710 if (qp->q_flag & QWCLOSE) {
3710 3711 mutex_exit(QLOCK(qp));
3711 3712 return (B_FALSE);
3712 3713 }
3713 3714 qp->q_rwcnt++;
3714 3715 ASSERT(qp->q_rwcnt != 0);
3715 3716 mutex_exit(QLOCK(qp));
3716 3717 return (B_TRUE);
3717 3718 }
3718 3719
3719 3720 /*
3720 3721 * Decrease the count of threads running in sync stream queue and wake up any
3721 3722 * threads blocked in removeq().
3722 3723 */
3723 3724 static void
3724 3725 rwnext_exit(queue_t *qp)
3725 3726 {
3726 3727 mutex_enter(QLOCK(qp));
3727 3728 qp->q_rwcnt--;
3728 3729 if (qp->q_flag & QWANTRMQSYNC) {
3729 3730 qp->q_flag &= ~QWANTRMQSYNC;
3730 3731 cv_broadcast(&qp->q_wait);
3731 3732 }
3732 3733 mutex_exit(QLOCK(qp));
3733 3734 }
3734 3735
3735 3736 /*
3736 3737 * The purpose of rwnext() is to call the rw procedure of the next
3737 3738 * (downstream) modules queue.
3738 3739 *
3739 3740 * treated as put entrypoint for perimeter syncronization.
3740 3741 *
3741 3742 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3742 3743 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3743 3744 * not matter if any regular put entrypoints have been already entered. We
3744 3745 * can't increment one of the sq_putcounts (instead of sq_count) because
3745 3746 * qwait_rw won't know which counter to decrement.
3746 3747 *
3747 3748 * It would be reasonable to add the lockless FASTPUT logic.
3748 3749 */
3749 3750 int
3750 3751 rwnext(queue_t *qp, struiod_t *dp)
3751 3752 {
3752 3753 queue_t *nqp;
3753 3754 syncq_t *sq;
3754 3755 uint16_t count;
3755 3756 uint16_t flags;
3756 3757 struct qinit *qi;
3757 3758 int (*proc)();
3758 3759 struct stdata *stp;
3759 3760 int isread;
3760 3761 int rval;
3761 3762
3762 3763 stp = STREAM(qp);
3763 3764 /*
3764 3765 * Prevent q_next from changing by holding sd_lock until acquiring
3765 3766 * SQLOCK. Note that a read-side rwnext from the streamhead will
3766 3767 * already have sd_lock acquired. In either case sd_lock is always
3767 3768 * released after acquiring SQLOCK.
3768 3769 *
3769 3770 * The streamhead read-side holding sd_lock when calling rwnext is
3770 3771 * required to prevent a race condition were M_DATA mblks flowing
3771 3772 * up the read-side of the stream could be bypassed by a rwnext()
3772 3773 * down-call. In this case sd_lock acts as the streamhead perimeter.
3773 3774 */
3774 3775 if ((nqp = _WR(qp)) == qp) {
3775 3776 isread = 0;
3776 3777 mutex_enter(&stp->sd_lock);
3777 3778 qp = nqp->q_next;
3778 3779 } else {
3779 3780 isread = 1;
3780 3781 if (nqp != stp->sd_wrq)
3781 3782 /* Not streamhead */
3782 3783 mutex_enter(&stp->sd_lock);
3783 3784 qp = _RD(nqp->q_next);
3784 3785 }
3785 3786 qi = qp->q_qinfo;
3786 3787 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3787 3788 /*
3788 3789 * Not a synchronous module or no r/w procedure for this
3789 3790 * queue, so just return EINVAL and let the caller handle it.
3790 3791 */
3791 3792 mutex_exit(&stp->sd_lock);
3792 3793 return (EINVAL);
3793 3794 }
3794 3795
3795 3796 if (rwnext_enter(qp) == B_FALSE) {
3796 3797 mutex_exit(&stp->sd_lock);
3797 3798 return (EINVAL);
3798 3799 }
3799 3800
3800 3801 sq = qp->q_syncq;
3801 3802 mutex_enter(SQLOCK(sq));
3802 3803 mutex_exit(&stp->sd_lock);
3803 3804 count = sq->sq_count;
3804 3805 flags = sq->sq_flags;
3805 3806 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3806 3807
3807 3808 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3808 3809 /*
3809 3810 * if this queue is being closed, return.
3810 3811 */
3811 3812 if (qp->q_flag & QWCLOSE) {
3812 3813 mutex_exit(SQLOCK(sq));
3813 3814 rwnext_exit(qp);
3814 3815 return (EINVAL);
3815 3816 }
3816 3817
3817 3818 /*
3818 3819 * Wait until we can enter the inner perimeter.
3819 3820 */
3820 3821 sq->sq_flags = flags | SQ_WANTWAKEUP;
3821 3822 cv_wait(&sq->sq_wait, SQLOCK(sq));
3822 3823 count = sq->sq_count;
3823 3824 flags = sq->sq_flags;
3824 3825 }
3825 3826
3826 3827 if (isread == 0 && stp->sd_struiowrq == NULL ||
3827 3828 isread == 1 && stp->sd_struiordq == NULL) {
3828 3829 /*
3829 3830 * Stream plumbing changed while waiting for inner perimeter
3830 3831 * so just return EINVAL and let the caller handle it.
3831 3832 */
3832 3833 mutex_exit(SQLOCK(sq));
3833 3834 rwnext_exit(qp);
3834 3835 return (EINVAL);
3835 3836 }
3836 3837 if (!(flags & SQ_CIPUT))
3837 3838 sq->sq_flags = flags | SQ_EXCL;
3838 3839 sq->sq_count = count + 1;
3839 3840 ASSERT(sq->sq_count != 0); /* Wraparound */
3840 3841 /*
3841 3842 * Note: The only message ordering guarantee that rwnext() makes is
3842 3843 * for the write queue flow-control case. All others (r/w queue
3843 3844 * with q_count > 0 (or q_first != 0)) are the resposibilty of
3844 3845 * the queue's rw procedure. This could be genralized here buy
3845 3846 * running the queue's service procedure, but that wouldn't be
3846 3847 * the most efficent for all cases.
3847 3848 */
3848 3849 mutex_exit(SQLOCK(sq));
3849 3850 if (! isread && (qp->q_flag & QFULL)) {
3850 3851 /*
3851 3852 * Write queue may be flow controlled. If so,
3852 3853 * mark the queue for wakeup when it's not.
3853 3854 */
3854 3855 mutex_enter(QLOCK(qp));
3855 3856 if (qp->q_flag & QFULL) {
3856 3857 qp->q_flag |= QWANTWSYNC;
3857 3858 mutex_exit(QLOCK(qp));
3858 3859 rval = EWOULDBLOCK;
3859 3860 goto out;
3860 3861 }
3861 3862 mutex_exit(QLOCK(qp));
3862 3863 }
3863 3864
3864 3865 if (! isread && dp->d_mp)
3865 3866 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3866 3867 dp->d_mp->b_datap->db_base);
3867 3868
3868 3869 rval = (*proc)(qp, dp);
3869 3870
3870 3871 if (isread && dp->d_mp)
3871 3872 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3872 3873 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3873 3874 out:
3874 3875 /*
3875 3876 * The queue is protected from being freed by sq_count, so it is
3876 3877 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3877 3878 */
3878 3879 rwnext_exit(qp);
3879 3880
3880 3881 mutex_enter(SQLOCK(sq));
3881 3882 flags = sq->sq_flags;
3882 3883 ASSERT(sq->sq_count != 0);
3883 3884 sq->sq_count--;
3884 3885 if (flags & SQ_TAIL) {
3885 3886 putnext_tail(sq, qp, flags);
3886 3887 /*
3887 3888 * The only purpose of this ASSERT is to preserve calling stack
3888 3889 * in DEBUG kernel.
3889 3890 */
3890 3891 ASSERT(flags & SQ_TAIL);
3891 3892 return (rval);
3892 3893 }
3893 3894 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3894 3895 /*
3895 3896 * Safe to always drop SQ_EXCL:
3896 3897 * Not SQ_CIPUT means we set SQ_EXCL above
3897 3898 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3898 3899 * did a qwriter(INNER) in which case nobody else
3899 3900 * is in the inner perimeter and we are exiting.
3900 3901 *
3901 3902 * I would like to make the following assertion:
3902 3903 *
3903 3904 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3904 3905 * sq->sq_count == 0);
3905 3906 *
3906 3907 * which indicates that if we are both putshared and exclusive,
3907 3908 * we became exclusive while executing the putproc, and the only
3908 3909 * claim on the syncq was the one we dropped a few lines above.
3909 3910 * But other threads that enter putnext while the syncq is exclusive
3910 3911 * need to make a claim as they may need to drop SQLOCK in the
3911 3912 * has_writers case to avoid deadlocks. If these threads are
3912 3913 * delayed or preempted, it is possible that the writer thread can
3913 3914 * find out that there are other claims making the (sq_count == 0)
3914 3915 * test invalid.
3915 3916 */
3916 3917
3917 3918 sq->sq_flags = flags & ~SQ_EXCL;
3918 3919 if (sq->sq_flags & SQ_WANTWAKEUP) {
3919 3920 sq->sq_flags &= ~SQ_WANTWAKEUP;
3920 3921 cv_broadcast(&sq->sq_wait);
3921 3922 }
3922 3923 mutex_exit(SQLOCK(sq));
3923 3924 return (rval);
3924 3925 }
3925 3926
3926 3927 /*
3927 3928 * The purpose of infonext() is to call the info procedure of the next
3928 3929 * (downstream) modules queue.
3929 3930 *
3930 3931 * treated as put entrypoint for perimeter syncronization.
3931 3932 *
3932 3933 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3933 3934 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3934 3935 * it does not matter if any regular put entrypoints have been already
3935 3936 * entered.
3936 3937 */
3937 3938 int
3938 3939 infonext(queue_t *qp, infod_t *idp)
3939 3940 {
3940 3941 queue_t *nqp;
3941 3942 syncq_t *sq;
3942 3943 uint16_t count;
3943 3944 uint16_t flags;
3944 3945 struct qinit *qi;
3945 3946 int (*proc)();
3946 3947 struct stdata *stp;
3947 3948 int rval;
3948 3949
3949 3950 stp = STREAM(qp);
3950 3951 /*
3951 3952 * Prevent q_next from changing by holding sd_lock until
3952 3953 * acquiring SQLOCK.
3953 3954 */
3954 3955 mutex_enter(&stp->sd_lock);
3955 3956 if ((nqp = _WR(qp)) == qp) {
3956 3957 qp = nqp->q_next;
3957 3958 } else {
3958 3959 qp = _RD(nqp->q_next);
3959 3960 }
3960 3961 qi = qp->q_qinfo;
3961 3962 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3962 3963 mutex_exit(&stp->sd_lock);
3963 3964 return (EINVAL);
3964 3965 }
3965 3966 sq = qp->q_syncq;
3966 3967 mutex_enter(SQLOCK(sq));
3967 3968 mutex_exit(&stp->sd_lock);
3968 3969 count = sq->sq_count;
3969 3970 flags = sq->sq_flags;
3970 3971 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3971 3972
3972 3973 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3973 3974 /*
3974 3975 * Wait until we can enter the inner perimeter.
3975 3976 */
3976 3977 sq->sq_flags = flags | SQ_WANTWAKEUP;
3977 3978 cv_wait(&sq->sq_wait, SQLOCK(sq));
3978 3979 count = sq->sq_count;
3979 3980 flags = sq->sq_flags;
3980 3981 }
3981 3982
3982 3983 if (! (flags & SQ_CIPUT))
3983 3984 sq->sq_flags = flags | SQ_EXCL;
3984 3985 sq->sq_count = count + 1;
3985 3986 ASSERT(sq->sq_count != 0); /* Wraparound */
3986 3987 mutex_exit(SQLOCK(sq));
3987 3988
3988 3989 rval = (*proc)(qp, idp);
3989 3990
3990 3991 mutex_enter(SQLOCK(sq));
3991 3992 flags = sq->sq_flags;
3992 3993 ASSERT(sq->sq_count != 0);
3993 3994 sq->sq_count--;
3994 3995 if (flags & SQ_TAIL) {
3995 3996 putnext_tail(sq, qp, flags);
3996 3997 /*
3997 3998 * The only purpose of this ASSERT is to preserve calling stack
3998 3999 * in DEBUG kernel.
3999 4000 */
4000 4001 ASSERT(flags & SQ_TAIL);
4001 4002 return (rval);
4002 4003 }
4003 4004 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4004 4005 /*
4005 4006 * XXXX
4006 4007 * I am not certain the next comment is correct here. I need to consider
4007 4008 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4008 4009 * might cause other problems. It just might be safer to drop it if
4009 4010 * !SQ_CIPUT because that is when we set it.
4010 4011 */
4011 4012 /*
4012 4013 * Safe to always drop SQ_EXCL:
4013 4014 * Not SQ_CIPUT means we set SQ_EXCL above
4014 4015 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4015 4016 * did a qwriter(INNER) in which case nobody else
4016 4017 * is in the inner perimeter and we are exiting.
4017 4018 *
4018 4019 * I would like to make the following assertion:
4019 4020 *
4020 4021 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4021 4022 * sq->sq_count == 0);
4022 4023 *
4023 4024 * which indicates that if we are both putshared and exclusive,
4024 4025 * we became exclusive while executing the putproc, and the only
4025 4026 * claim on the syncq was the one we dropped a few lines above.
4026 4027 * But other threads that enter putnext while the syncq is exclusive
4027 4028 * need to make a claim as they may need to drop SQLOCK in the
4028 4029 * has_writers case to avoid deadlocks. If these threads are
4029 4030 * delayed or preempted, it is possible that the writer thread can
4030 4031 * find out that there are other claims making the (sq_count == 0)
4031 4032 * test invalid.
4032 4033 */
4033 4034
4034 4035 sq->sq_flags = flags & ~SQ_EXCL;
4035 4036 mutex_exit(SQLOCK(sq));
4036 4037 return (rval);
4037 4038 }
4038 4039
4039 4040 /*
4040 4041 * Return nonzero if the queue is responsible for struio(), else return 0.
4041 4042 */
4042 4043 int
4043 4044 isuioq(queue_t *q)
4044 4045 {
4045 4046 if (q->q_flag & QREADR)
4046 4047 return (STREAM(q)->sd_struiordq == q);
4047 4048 else
4048 4049 return (STREAM(q)->sd_struiowrq == q);
4049 4050 }
4050 4051
4051 4052 #if defined(__sparc)
4052 4053 int disable_putlocks = 0;
4053 4054 #else
4054 4055 int disable_putlocks = 1;
4055 4056 #endif
4056 4057
4057 4058 /*
4058 4059 * called by create_putlock.
4059 4060 */
4060 4061 static void
4061 4062 create_syncq_putlocks(queue_t *q)
4062 4063 {
4063 4064 syncq_t *sq = q->q_syncq;
4064 4065 ciputctrl_t *cip;
4065 4066 int i;
4066 4067
4067 4068 ASSERT(sq != NULL);
4068 4069
4069 4070 ASSERT(disable_putlocks == 0);
4070 4071 ASSERT(n_ciputctrl >= min_n_ciputctrl);
4071 4072 ASSERT(ciputctrl_cache != NULL);
4072 4073
4073 4074 if (!(sq->sq_type & SQ_CIPUT))
4074 4075 return;
4075 4076
4076 4077 for (i = 0; i <= 1; i++) {
4077 4078 if (sq->sq_ciputctrl == NULL) {
4078 4079 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4079 4080 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4080 4081 mutex_enter(SQLOCK(sq));
4081 4082 if (sq->sq_ciputctrl != NULL) {
4082 4083 mutex_exit(SQLOCK(sq));
4083 4084 kmem_cache_free(ciputctrl_cache, cip);
4084 4085 } else {
4085 4086 ASSERT(sq->sq_nciputctrl == 0);
4086 4087 sq->sq_nciputctrl = n_ciputctrl - 1;
4087 4088 /*
4088 4089 * putnext checks sq_ciputctrl without holding
4089 4090 * SQLOCK. if it is not NULL putnext assumes
4090 4091 * sq_nciputctrl is initialized. membar below
4091 4092 * insures that.
4092 4093 */
4093 4094 membar_producer();
4094 4095 sq->sq_ciputctrl = cip;
4095 4096 mutex_exit(SQLOCK(sq));
4096 4097 }
4097 4098 }
4098 4099 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4099 4100 if (i == 1)
4100 4101 break;
4101 4102 q = _OTHERQ(q);
4102 4103 if (!(q->q_flag & QPERQ)) {
4103 4104 ASSERT(sq == q->q_syncq);
4104 4105 break;
4105 4106 }
4106 4107 ASSERT(q->q_syncq != NULL);
4107 4108 ASSERT(sq != q->q_syncq);
4108 4109 sq = q->q_syncq;
4109 4110 ASSERT(sq->sq_type & SQ_CIPUT);
4110 4111 }
4111 4112 }
4112 4113
4113 4114 /*
4114 4115 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4115 4116 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4116 4117 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4117 4118 * starting from q and down to the driver.
4118 4119 *
4119 4120 * This should be called after the affected queues are part of stream
4120 4121 * geometry. It should be called from driver/module open routine after
4121 4122 * qprocson() call. It is also called from nfs syscall where it is known that
4122 4123 * stream is configured and won't change its geometry during create_putlock
4123 4124 * call.
4124 4125 *
4125 4126 * caller normally uses 0 value for the stream argument to speed up MT putnext
4126 4127 * into the perimeter of q for example because its perimeter is per module
4127 4128 * (e.g. IP).
4128 4129 *
4129 4130 * caller normally uses non 0 value for the stream argument to hint the system
4130 4131 * that the stream of q is a very contended global system stream
4131 4132 * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4132 4133 * particularly MT hot.
4133 4134 *
4134 4135 * Caller insures stream plumbing won't happen while we are here and therefore
4135 4136 * q_next can be safely used.
4136 4137 */
4137 4138
4138 4139 void
4139 4140 create_putlocks(queue_t *q, int stream)
4140 4141 {
4141 4142 ciputctrl_t *cip;
4142 4143 struct stdata *stp = STREAM(q);
4143 4144
4144 4145 q = _WR(q);
4145 4146 ASSERT(stp != NULL);
4146 4147
4147 4148 if (disable_putlocks != 0)
4148 4149 return;
4149 4150
4150 4151 if (n_ciputctrl < min_n_ciputctrl)
4151 4152 return;
4152 4153
4153 4154 ASSERT(ciputctrl_cache != NULL);
4154 4155
4155 4156 if (stream != 0 && stp->sd_ciputctrl == NULL) {
4156 4157 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4157 4158 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4158 4159 mutex_enter(&stp->sd_lock);
4159 4160 if (stp->sd_ciputctrl != NULL) {
4160 4161 mutex_exit(&stp->sd_lock);
4161 4162 kmem_cache_free(ciputctrl_cache, cip);
4162 4163 } else {
4163 4164 ASSERT(stp->sd_nciputctrl == 0);
4164 4165 stp->sd_nciputctrl = n_ciputctrl - 1;
4165 4166 /*
4166 4167 * putnext checks sd_ciputctrl without holding
4167 4168 * sd_lock. if it is not NULL putnext assumes
4168 4169 * sd_nciputctrl is initialized. membar below
4169 4170 * insures that.
4170 4171 */
4171 4172 membar_producer();
4172 4173 stp->sd_ciputctrl = cip;
4173 4174 mutex_exit(&stp->sd_lock);
4174 4175 }
4175 4176 }
4176 4177
4177 4178 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4178 4179
4179 4180 while (_SAMESTR(q)) {
4180 4181 create_syncq_putlocks(q);
4181 4182 if (stream == 0)
4182 4183 return;
4183 4184 q = q->q_next;
4184 4185 }
4185 4186 ASSERT(q != NULL);
4186 4187 create_syncq_putlocks(q);
4187 4188 }
4188 4189
4189 4190 /*
4190 4191 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4191 4192 * through a stream.
4192 4193 *
4193 4194 * Data currently record per-event is a timestamp, module/driver name,
4194 4195 * downstream module/driver name, optional callstack, event type and a per
4195 4196 * type datum. Much of the STREAMS framework is instrumented for automatic
4196 4197 * flow tracing (when enabled). Events can be defined and used by STREAMS
4197 4198 * modules and drivers.
4198 4199 *
4199 4200 * Global objects:
4200 4201 *
4201 4202 * str_ftevent() - Add a flow-trace event to a dblk.
4202 4203 * str_ftfree() - Free flow-trace data
4203 4204 *
4204 4205 * Local objects:
4205 4206 *
4206 4207 * fthdr_cache - pointer to the kmem cache for trace header.
4207 4208 * ftblk_cache - pointer to the kmem cache for trace data blocks.
4208 4209 */
4209 4210
4210 4211 int str_ftnever = 1; /* Don't do STREAMS flow tracing */
4211 4212 int str_ftstack = 0; /* Don't record event call stacks */
4212 4213
4213 4214 void
4214 4215 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4215 4216 {
4216 4217 ftblk_t *bp = hp->tail;
4217 4218 ftblk_t *nbp;
4218 4219 ftevnt_t *ep;
4219 4220 int ix, nix;
4220 4221
4221 4222 ASSERT(hp != NULL);
4222 4223
4223 4224 for (;;) {
4224 4225 if ((ix = bp->ix) == FTBLK_EVNTS) {
4225 4226 /*
4226 4227 * Tail doesn't have room, so need a new tail.
4227 4228 *
4228 4229 * To make this MT safe, first, allocate a new
4229 4230 * ftblk, and initialize it. To make life a
4230 4231 * little easier, reserve the first slot (mostly
4231 4232 * by making ix = 1). When we are finished with
4232 4233 * the initialization, CAS this pointer to the
4233 4234 * tail. If this succeeds, this is the new
4234 4235 * "next" block. Otherwise, another thread
4235 4236 * got here first, so free the block and start
4236 4237 * again.
4237 4238 */
4238 4239 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4239 4240 if (nbp == NULL) {
4240 4241 /* no mem, so punt */
4241 4242 str_ftnever++;
4242 4243 /* free up all flow data? */
↓ open down ↓ |
3431 lines elided |
↑ open up ↑ |
4243 4244 return;
4244 4245 }
4245 4246 nbp->nxt = NULL;
4246 4247 nbp->ix = 1;
4247 4248 /*
4248 4249 * Just in case there is another thread about
4249 4250 * to get the next index, we need to make sure
4250 4251 * the value is there for it.
4251 4252 */
4252 4253 membar_producer();
4253 - if (casptr(&hp->tail, bp, nbp) == bp) {
4254 + if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4254 4255 /* CAS was successful */
4255 4256 bp->nxt = nbp;
4256 4257 membar_producer();
4257 4258 bp = nbp;
4258 4259 ix = 0;
4259 4260 goto cas_good;
4260 4261 } else {
4261 4262 kmem_cache_free(ftblk_cache, nbp);
4262 4263 bp = hp->tail;
4263 4264 continue;
4264 4265 }
4265 4266 }
4266 4267 nix = ix + 1;
4267 - if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
4268 + if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4268 4269 cas_good:
4269 4270 if (curthread != hp->thread) {
4270 4271 hp->thread = curthread;
4271 4272 evnt |= FTEV_CS;
4272 4273 }
4273 4274 if (CPU->cpu_seqid != hp->cpu_seqid) {
4274 4275 hp->cpu_seqid = CPU->cpu_seqid;
4275 4276 evnt |= FTEV_PS;
4276 4277 }
4277 4278 ep = &bp->ev[ix];
4278 4279 break;
4279 4280 }
4280 4281 }
4281 4282
4282 4283 if (evnt & FTEV_QMASK) {
4283 4284 queue_t *qp = p;
4284 4285
4285 4286 if (!(qp->q_flag & QREADR))
4286 4287 evnt |= FTEV_ISWR;
4287 4288
4288 4289 ep->mid = Q2NAME(qp);
4289 4290
4290 4291 /*
4291 4292 * We only record the next queue name for FTEV_PUTNEXT since
4292 4293 * that's the only time we *really* need it, and the putnext()
4293 4294 * code ensures that qp->q_next won't vanish. (We could use
4294 4295 * claimstr()/releasestr() but at a performance cost.)
4295 4296 */
4296 4297 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4297 4298 ep->midnext = Q2NAME(qp->q_next);
4298 4299 else
4299 4300 ep->midnext = NULL;
4300 4301 } else {
4301 4302 ep->mid = p;
4302 4303 ep->midnext = NULL;
4303 4304 }
4304 4305
4305 4306 if (ep->stk != NULL)
4306 4307 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4307 4308
4308 4309 ep->ts = gethrtime();
4309 4310 ep->evnt = evnt;
4310 4311 ep->data = data;
4311 4312 hp->hash = (hp->hash << 9) + hp->hash;
4312 4313 hp->hash += (evnt << 16) | data;
4313 4314 hp->hash += (uintptr_t)ep->mid;
4314 4315 }
4315 4316
4316 4317 /*
4317 4318 * Free flow-trace data.
4318 4319 */
4319 4320 void
4320 4321 str_ftfree(dblk_t *dbp)
4321 4322 {
4322 4323 fthdr_t *hp = dbp->db_fthdr;
4323 4324 ftblk_t *bp = &hp->first;
4324 4325 ftblk_t *nbp;
4325 4326
4326 4327 if (bp != hp->tail || bp->ix != 0) {
4327 4328 /*
4328 4329 * Clear out the hash, have the tail point to itself, and free
4329 4330 * any continuation blocks.
4330 4331 */
4331 4332 bp = hp->first.nxt;
4332 4333 hp->tail = &hp->first;
4333 4334 hp->hash = 0;
4334 4335 hp->first.nxt = NULL;
4335 4336 hp->first.ix = 0;
4336 4337 while (bp != NULL) {
4337 4338 nbp = bp->nxt;
4338 4339 kmem_cache_free(ftblk_cache, bp);
4339 4340 bp = nbp;
4340 4341 }
4341 4342 }
4342 4343 kmem_cache_free(fthdr_cache, hp);
4343 4344 dbp->db_fthdr = NULL;
4344 4345 }
↓ open down ↓ |
67 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX