📜  角度7 |可观察的

📅  最后修改于: 2021-05-13 20:34:42             🧑  作者: Mango


  • next:它是每个消息的处理程序,可观察到,执行开始后可能被称为零次或多次。
import { Component, OnInit } from '@angular/core';
import {Observable} from 'rxjs';
    selector: 'app-next-example',
    templateUrl: './next-example.component.html',
    styleUrls: ['./next-example.component.css']
export class NextExampleComponent implements OnInit {
    constructor() { }
    ngOnInit() {
        // Create a new Observable
        const sqnc = new Observable(countOnetoTen);
        // Execute the Observable and print the
        // result of each notification
        // next() is a call to countOnetoTen method
        // to get the next value from the observable
            next(num) { console.log(num); }
        // This function runs when subscribe()
        // is called
        function countOnetoTen(observer) {
            for(var i = 1; i <= 10; i++) {
                // Calls the next observable
                // notification
            // Unsubscribe after completing
            // the sequence
            return {unsubscribe(){}};

import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
    selector: 'app-error-example',
    templateUrl: './error-example.component.html',
    styleUrls: ['./error-example.component.css']
export class ErrorExampleComponent implements OnInit {
    constructor() { }
    ngOnInit() {
        // Create a new Observable
        const sqnc = new Observable(generateError);
        // Execute the Observable and print the
        // result of each notification
        // error() is called when next generate
        // some error
            next(num) { },
            error(err) { console.log('Error Somewhere')}
        // This function runs when subscribe() is called
        function generateError(observer){
            // Calls the next observable notification
            // It generates an error and error is called
            observer.next( adddlert("Welcome guest!"));
            // Unsubscribe after completing the sequence
            return {unsubscribe(){}};

import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
    selector: 'app-complete-example',
    templateUrl: './complete-example.component.html',
    styleUrls: ['./complete-example.component.css']
export class CompleteExampleComponent implements OnInit {
    constructor() { }
    ngOnInit() {
        // Create a new Observable
        const sqnc = new Observable(countOnetoTen);
        // Execute the Observable and print the
        // result of each notification
            next(num) { console.log(num); },
        // This function runs when subscribe()
        // is called
        function countOnetoTen(observer){
            for(var i = 1; i <= 10; i++) {
                // Calls the next observable
                // notification
            // Unsubscribe after completing
            // the sequence
            return {unsubscribe(){}};

import { Component } from '@angular/core';
import {Observable} from "rxjs";
import { CompileTemplateMetadata } from '@angular/compiler';
    selector: 'app-rt',
    templateUrl: './app.component.html',
    styleUrls: ['./app.component.css']
export class AppComponent {
    title = 'MyFirstApp';
// Create a new Observable that will
// deliver the above sequence
const table = new Observable(tableOfTwo);
// Execute the Observable and print the
// result of each notification
// next() is a call to tableOfTwo() method
// to get the next value from the observable
    next(num) { console.log(num); },
    complete() { console.log('Finished sequence'); }
// This function runs when subscribe() is called
function tableOfTwo(observer) {
    for(var i = 1; i <= 10; i++) {
      observer.next('2 * ' + i + ' = ' + i*2);
    return {unsubscribe(){}};

import { Component, OnInit } from '@angular/core';
import {Observable} from 'rxjs';
    selector: 'app-my-page',
    templateUrl: './my-page.component.html',
    styleUrls: ['./my-page.component.css']
export class MyPageComponent implements OnInit {
    constructor() { }
    ngOnInit() {
        const multiSeq = new Observable(this.multiSeqSubs());
            next(num) { console.log('1st subscribe: ' + num); },
            complete() { console.log('1st sequence finished.'); }
        // Subscribe again After 1 seconds.
        setTimeout(() => {
                next(num) { console.log('2nd subscribe: ' + num); },
                complete() { console.log('2nd sequence finished.'); }
        }, 1000);
    multiSeqSubs() {
        const seq = [];
        for (var i = 1; i <= 10; i++) {
            // Pushes the string onto sequence
            seq.push('2 * ' + i + '=' + 2*i)
        // Keep track of each observer
        const obs = [];
        // A single time Stamp for one
        // set of values being generated,
        // multicasted to each subscriber
        let timeStamp;
        // Return the subscriber function
        // (runs when subscribe() function
        // is invoked)
        return (ob) => {
            // When this is the first subscription,
            // start the sequence
            if (obs.length === 1) {
                timeStamp = this.exec_Sequence({
                    next(val) {
                        // Iterate through observers
                        // and notify all subscriptions
                        obs.forEach(o => o.next(val));
                    complete() {
                        // Notify all complete callbacks
                        obs.slice(0).forEach(o => o.complete());
                }, seq, 0);
            return {
                // Unsubscribe from the observers
                unsubscribe() {
                    obs.splice(obs.indexOf(ob), 1);
                    // Cleanup
                    if (obs.length === 0) {
    // Executes the sequence
    exec_Sequence(observer, sequence, index) {
        return setTimeout(() => {
            if (index === sequence.length - 1) {
            } else {
                this.exec_Sequence(observer, sequence, ++index);
        }, 1000);
    // Create a new Observable that will
    // deliver the above sequence

  • 输出:

  • 错误:它是每个错误消息的处理程序。错误停止可观察实例的执行。


import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
    selector: 'app-error-example',
    templateUrl: './error-example.component.html',
    styleUrls: ['./error-example.component.css']
export class ErrorExampleComponent implements OnInit {
    constructor() { }
    ngOnInit() {
        // Create a new Observable
        const sqnc = new Observable(generateError);
        // Execute the Observable and print the
        // result of each notification
        // error() is called when next generate
        // some error
            next(num) { },
            error(err) { console.log('Error Somewhere')}
        // This function runs when subscribe() is called
        function generateError(observer){
            // Calls the next observable notification
            // It generates an error and error is called
            observer.next( adddlert("Welcome guest!"));
            // Unsubscribe after completing the sequence
            return {unsubscribe(){}};
  • 输出:

  • complete:它是一个句柄,用于通知可观察到的执行完成。


import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
    selector: 'app-complete-example',
    templateUrl: './complete-example.component.html',
    styleUrls: ['./complete-example.component.css']
export class CompleteExampleComponent implements OnInit {
    constructor() { }
    ngOnInit() {
        // Create a new Observable
        const sqnc = new Observable(countOnetoTen);
        // Execute the Observable and print the
        // result of each notification
            next(num) { console.log(num); },
        // This function runs when subscribe()
        // is called
        function countOnetoTen(observer){
            for(var i = 1; i <= 10; i++) {
                // Calls the next observable
                // notification
            // Unsubscribe after completing
            // the sequence
            return {unsubscribe(){}};
  • 输出:


import {Observables} from 'rxjs'


import { Component } from '@angular/core';
import {Observable} from "rxjs";
import { CompileTemplateMetadata } from '@angular/compiler';
    selector: 'app-rt',
    templateUrl: './app.component.html',
    styleUrls: ['./app.component.css']
export class AppComponent {
    title = 'MyFirstApp';
// Create a new Observable that will
// deliver the above sequence
const table = new Observable(tableOfTwo);
// Execute the Observable and print the
// result of each notification
// next() is a call to tableOfTwo() method
// to get the next value from the observable
    next(num) { console.log(num); },
    complete() { console.log('Finished sequence'); }
// This function runs when subscribe() is called
function tableOfTwo(observer) {
    for(var i = 1; i <= 10; i++) {
      observer.next('2 * ' + i + ' = ' + i*2);
    return {unsubscribe(){}};

在此代码中, next()用于调用可观察对象的下一个返回值,在可观察对象中,在任务完成之后,它返回取消订阅函数,这导致对可观察对象的取消订阅,因此无法进行更多请求。调用complete()方法时,它将打印字符串“ Finished sequence”。所有功能都显示在控制台中。



import { Component, OnInit } from '@angular/core';
import {Observable} from 'rxjs';
    selector: 'app-my-page',
    templateUrl: './my-page.component.html',
    styleUrls: ['./my-page.component.css']
export class MyPageComponent implements OnInit {
    constructor() { }
    ngOnInit() {
        const multiSeq = new Observable(this.multiSeqSubs());
            next(num) { console.log('1st subscribe: ' + num); },
            complete() { console.log('1st sequence finished.'); }
        // Subscribe again After 1 seconds.
        setTimeout(() => {
                next(num) { console.log('2nd subscribe: ' + num); },
                complete() { console.log('2nd sequence finished.'); }
        }, 1000);
    multiSeqSubs() {
        const seq = [];
        for (var i = 1; i <= 10; i++) {
            // Pushes the string onto sequence
            seq.push('2 * ' + i + '=' + 2*i)
        // Keep track of each observer
        const obs = [];
        // A single time Stamp for one
        // set of values being generated,
        // multicasted to each subscriber
        let timeStamp;
        // Return the subscriber function
        // (runs when subscribe() function
        // is invoked)
        return (ob) => {
            // When this is the first subscription,
            // start the sequence
            if (obs.length === 1) {
                timeStamp = this.exec_Sequence({
                    next(val) {
                        // Iterate through observers
                        // and notify all subscriptions
                        obs.forEach(o => o.next(val));
                    complete() {
                        // Notify all complete callbacks
                        obs.slice(0).forEach(o => o.complete());
                }, seq, 0);
            return {
                // Unsubscribe from the observers
                unsubscribe() {
                    obs.splice(obs.indexOf(ob), 1);
                    // Cleanup
                    if (obs.length === 0) {
    // Executes the sequence
    exec_Sequence(observer, sequence, index) {
        return setTimeout(() => {
            if (index === sequence.length - 1) {
            } else {
                this.exec_Sequence(observer, sequence, ++index);
        }, 1000);
    // Create a new Observable that will
    // deliver the above sequence

该代码执行相同的功能,即处理多播操作。在这段代码中,我们有一个观察者列表,它取决于编号。多播操作的订阅数。在此,在执行代码期间,我们只有2个正在执行的操作,因此“ obs”列表中只有2个元素。

可观察变量产生异步值,因此try / catch不会捕获任何错误,因为它可能导致代码停止运行,而与在该时间实例上运行的其他任务无关。相反,我们通过在观察者上指定错误回调来处理错误。产生错误时,它会导致可观察对象清除订阅并停止为该订阅生成值。可观察对象可以产生值(调用下一个回调),也可以完成,调用完成或错误回调。


    next(val) { console.log('Next: ' + val)},
    error(err) { console.log('Error: ' + err)}